1use crate::commands::ingest_claude::find_claude_binary;
25use crate::constants::MAX_MEMORY_BODY_LEN;
26use crate::entity_type::EntityType;
27use crate::errors::AppError;
28use crate::paths::AppPaths;
29use crate::storage::connection::{ensure_db_ready, open_rw};
30use crate::storage::entities::{self, NewEntity, NewRelationship};
31use crate::storage::memories;
32
33use rusqlite::Connection;
34use serde::{Deserialize, Serialize};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::process::{Command, Stdio};
38use std::time::Instant;
39
40const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
45const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
46const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
47const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
48
49const BINDINGS_SCHEMA: &str = r#"{
54 "type": "object",
55 "properties": {
56 "entities": {
57 "type": "array",
58 "items": {
59 "type": "object",
60 "properties": {
61 "name": { "type": "string" },
62 "entity_type": {
63 "type": "string",
64 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
65 }
66 },
67 "required": ["name", "entity_type"],
68 "additionalProperties": false
69 }
70 },
71 "relationships": {
72 "type": "array",
73 "items": {
74 "type": "object",
75 "properties": {
76 "source": { "type": "string" },
77 "target": { "type": "string" },
78 "relation": {
79 "type": "string",
80 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
81 },
82 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
83 },
84 "required": ["source","target","relation","strength"],
85 "additionalProperties": false
86 }
87 }
88 },
89 "required": ["entities","relationships"],
90 "additionalProperties": false
91}"#;
92
93const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
94 "type": "object",
95 "properties": {
96 "description": { "type": "string" }
97 },
98 "required": ["description"],
99 "additionalProperties": false
100}"#;
101
102const BODY_ENRICH_SCHEMA: &str = r#"{
103 "type": "object",
104 "properties": {
105 "enriched_body": { "type": "string" }
106 },
107 "required": ["enriched_body"],
108 "additionalProperties": false
109}"#;
110
111const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
113Scale:\n\
114- 0.9 = vital hard dependency (A cannot function without B)\n\
115- 0.7 = important design relationship (A strongly supports/enables B)\n\
116- 0.5 = useful contextual link (A and B share relevant context)\n\
117- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
118Respond with the calibrated weight and brief reasoning.";
119
120const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
121 "type": "object",
122 "properties": {
123 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
124 "reasoning": { "type": "string" }
125 },
126 "required": ["calibrated_weight", "reasoning"],
127 "additionalProperties": false
128}"#;
129
130const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
132Valid canonical relations (pick exactly one):\n\
133- depends-on: A cannot function without B\n\
134- uses: A utilizes B but could substitute it\n\
135- supports: A reinforces or enables B\n\
136- causes: A triggers or produces B\n\
137- fixes: A resolves a problem in B\n\
138- contradicts: A conflicts with or invalidates B\n\
139- applies-to: A is relevant to or scoped within B\n\
140- follows: A comes after B in sequence\n\
141- replaces: A substitutes B\n\
142- tracked-in: A is monitored in B\n\
143- related: A and B share context (use sparingly)\n\n\
144Respond with the correct relation, strength, and reasoning.";
145
146const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
147 "type": "object",
148 "properties": {
149 "relation": { "type": "string" },
150 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
151 "reasoning": { "type": "string" }
152 },
153 "required": ["relation", "strength", "reasoning"],
154 "additionalProperties": false
155}"#;
156
157const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
159Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
160If NO meaningful relationship exists, set relation to \"none\".\n\
161Respond with the relation (or \"none\"), strength, and reasoning.";
162
163const ENTITY_CONNECT_SCHEMA: &str = r#"{
164 "type": "object",
165 "properties": {
166 "relation": { "type": "string" },
167 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
168 "reasoning": { "type": "string" }
169 },
170 "required": ["relation", "strength", "reasoning"],
171 "additionalProperties": false
172}"#;
173
174const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
176Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
177If the current type is correct, keep it. If wrong, suggest the correct type.\n\
178Respond with the validated type and reasoning.";
179
180const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
181 "type": "object",
182 "properties": {
183 "validated_type": { "type": "string" },
184 "was_correct": { "type": "boolean" },
185 "reasoning": { "type": "string" }
186 },
187 "required": ["validated_type", "was_correct", "reasoning"],
188 "additionalProperties": false
189}"#;
190
191const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
193BAD: 'ingested from docs/auth.md'\n\
194GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
195Respond with the improved description and reasoning.";
196
197const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
198 "type": "object",
199 "properties": {
200 "description": { "type": "string" },
201 "reasoning": { "type": "string" }
202 },
203 "required": ["description", "reasoning"],
204 "additionalProperties": false
205}"#;
206
207const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
209Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
210
211const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
212 "type": "object",
213 "properties": {
214 "domain": { "type": "string" },
215 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
216 "reasoning": { "type": "string" }
217 },
218 "required": ["domain", "confidence", "reasoning"],
219 "additionalProperties": false
220}"#;
221
222const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
224Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
225Respond with a list of issues found (or empty if none) and an overall quality score.";
226
227const GRAPH_AUDIT_SCHEMA: &str = r#"{
228 "type": "object",
229 "properties": {
230 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
231 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
232 "reasoning": { "type": "string" }
233 },
234 "required": ["quality_score", "issues", "reasoning"],
235 "additionalProperties": false
236}"#;
237
238const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
240Entity names: lowercase kebab-case, domain-specific.\n\
241Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
242Respond with extracted entities, relationships, and a synthesis summary.";
243
244const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
245 "type": "object",
246 "properties": {
247 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
248 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
249 "summary": { "type": "string" }
250 },
251 "required": ["entities", "relationships", "summary"],
252 "additionalProperties": false
253}"#;
254
255const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
257Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
258Respond with the restructured body and a brief summary of changes.";
259
260const BODY_EXTRACT_SCHEMA: &str = r#"{
261 "type": "object",
262 "properties": {
263 "restructured_body": { "type": "string" },
264 "changes_summary": { "type": "string" }
265 },
266 "required": ["restructured_body", "changes_summary"],
267 "additionalProperties": false
268}"#;
269
270const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2751. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2762. Typed relationships between entities with strength scores\n\n\
277Rules:\n\
278- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
279- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
280- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
281- NEVER use 'mentions' as relationship type\n\
282- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
283- Prefer fewer high-quality entities over many low-quality ones";
284
285const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
286
287const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
288
289#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
295#[serde(rename_all = "kebab-case")]
296pub enum EnrichOperation {
297 MemoryBindings,
299 EntityDescriptions,
301 BodyEnrich,
303 WeightCalibrate,
305 RelationReclassify,
307 EntityConnect,
309 EntityTypeValidate,
311 DescriptionEnrich,
313 CrossDomainBridges,
315 DomainClassify,
317 GraphAudit,
319 DeepResearchSynth,
321 BodyExtract,
323}
324
325#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
327pub enum EnrichMode {
328 ClaudeCode,
330 Codex,
332}
333
334impl std::fmt::Display for EnrichMode {
335 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
336 match self {
337 EnrichMode::ClaudeCode => write!(f, "claude-code"),
338 EnrichMode::Codex => write!(f, "codex"),
339 }
340 }
341}
342
343#[derive(clap::Args)]
345#[command(
346 about = "Enrich graph memories and entities using an LLM provider",
347 after_long_help = "EXAMPLES:\n \
348 # Add missing entity bindings to all unbound memories\n \
349 sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n \
350 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
351 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
352 # Expand short memory bodies (GAP-18)\n \
353 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
354 # Resume an interrupted body-enrich run\n \
355 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
356 # Retry only failed items from a previous run\n \
357 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
358 EXIT CODES:\n \
359 0 success\n \
360 1 validation error (bad args, binary not found)\n \
361 14 I/O error"
362)]
363pub struct EnrichArgs {
364 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
366 pub operation: EnrichOperation,
367
368 #[arg(long, value_enum, default_value = "claude-code")]
370 pub mode: EnrichMode,
371
372 #[arg(long, value_name = "N")]
374 pub limit: Option<usize>,
375
376 #[arg(long)]
378 pub dry_run: bool,
379
380 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
382 pub namespace: Option<String>,
383
384 #[arg(long, value_name = "PATH")]
387 pub claude_binary: Option<PathBuf>,
388
389 #[arg(long, value_name = "MODEL")]
391 pub claude_model: Option<String>,
392
393 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
395 pub claude_timeout: u64,
396
397 #[arg(long, value_name = "PATH")]
400 pub codex_binary: Option<PathBuf>,
401
402 #[arg(long, value_name = "MODEL")]
404 pub codex_model: Option<String>,
405
406 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
408 pub codex_timeout: u64,
409
410 #[arg(long, value_name = "USD")]
413 pub max_cost_usd: Option<f64>,
414
415 #[arg(long)]
418 pub resume: bool,
419
420 #[arg(long)]
422 pub retry_failed: bool,
423
424 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
427 pub min_output_chars: usize,
428
429 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
431 pub max_output_chars: usize,
432
433 #[arg(long, default_value_t = true)]
435 pub preserve_check: bool,
436
437 #[arg(long, value_name = "PATH")]
439 pub prompt_template: Option<PathBuf>,
440
441 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
445 pub llm_parallelism: u32,
446
447 #[arg(long)]
450 pub json: bool,
451
452 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
454 pub db: Option<String>,
455}
456
457#[derive(Debug, Serialize)]
466struct PhaseEvent<'a> {
467 phase: &'a str,
468 #[serde(skip_serializing_if = "Option::is_none")]
469 binary_path: Option<&'a str>,
470 #[serde(skip_serializing_if = "Option::is_none")]
471 version: Option<&'a str>,
472 #[serde(skip_serializing_if = "Option::is_none")]
473 items_total: Option<usize>,
474 #[serde(skip_serializing_if = "Option::is_none")]
475 items_pending: Option<usize>,
476 #[serde(skip_serializing_if = "Option::is_none")]
478 llm_parallelism: Option<u32>,
479}
480
481#[derive(Debug, Serialize)]
482struct ItemEvent<'a> {
483 item: &'a str,
485 status: &'a str,
486 #[serde(skip_serializing_if = "Option::is_none")]
487 memory_id: Option<i64>,
488 #[serde(skip_serializing_if = "Option::is_none")]
489 entity_id: Option<i64>,
490 #[serde(skip_serializing_if = "Option::is_none")]
491 entities: Option<usize>,
492 #[serde(skip_serializing_if = "Option::is_none")]
493 rels: Option<usize>,
494 #[serde(skip_serializing_if = "Option::is_none")]
495 chars_before: Option<usize>,
496 #[serde(skip_serializing_if = "Option::is_none")]
497 chars_after: Option<usize>,
498 #[serde(skip_serializing_if = "Option::is_none")]
499 cost_usd: Option<f64>,
500 #[serde(skip_serializing_if = "Option::is_none")]
501 elapsed_ms: Option<u64>,
502 #[serde(skip_serializing_if = "Option::is_none")]
503 error: Option<String>,
504 index: usize,
505 total: usize,
506}
507
508#[derive(Debug, Serialize)]
509struct EnrichSummary {
510 summary: bool,
511 operation: String,
512 items_total: usize,
513 completed: usize,
514 failed: usize,
515 skipped: usize,
516 cost_usd: f64,
517 elapsed_ms: u64,
518}
519
520use crate::output::emit_json_line as emit_json;
521
522fn open_queue_db(path: &str) -> Result<Connection, AppError> {
537 let conn = Connection::open(path)?;
538 conn.pragma_update(None, "journal_mode", "wal")?;
539 conn.execute_batch(
540 "CREATE TABLE IF NOT EXISTS queue (
541 id INTEGER PRIMARY KEY AUTOINCREMENT,
542 item_key TEXT NOT NULL UNIQUE,
543 item_type TEXT NOT NULL DEFAULT 'memory',
544 status TEXT NOT NULL DEFAULT 'pending',
545 memory_id INTEGER,
546 entity_id INTEGER,
547 entities INTEGER DEFAULT 0,
548 rels INTEGER DEFAULT 0,
549 error TEXT,
550 cost_usd REAL DEFAULT 0.0,
551 attempt INTEGER DEFAULT 0,
552 elapsed_ms INTEGER,
553 created_at TEXT DEFAULT (datetime('now')),
554 done_at TEXT
555 );
556 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
557 )?;
558 Ok(conn)
559}
560
561fn call_claude(
569 binary: &Path,
570 prompt: &str,
571 json_schema: &str,
572 input_text: &str,
573 model: Option<&str>,
574 timeout_secs: u64,
575) -> Result<(serde_json::Value, f64, bool), AppError> {
576 let result = crate::commands::claude_runner::run_claude(
577 binary,
578 prompt,
579 json_schema,
580 input_text,
581 model,
582 timeout_secs,
583 7,
584 )?;
585 Ok((result.value, result.cost_usd, result.is_oauth))
586}
587
588fn scan_unbound_memories(
596 conn: &Connection,
597 namespace: &str,
598 limit: Option<usize>,
599) -> Result<Vec<(i64, String, String)>, AppError> {
600 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
601 let sql = format!(
602 "SELECT m.id, m.name, m.body
603 FROM memories m
604 WHERE m.namespace = ?1
605 AND m.deleted_at IS NULL
606 AND NOT EXISTS (
607 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
608 )
609 ORDER BY m.id
610 {limit_clause}"
611 );
612 let mut stmt = conn.prepare(&sql)?;
613 let rows = stmt
614 .query_map(rusqlite::params![namespace], |r| {
615 Ok((
616 r.get::<_, i64>(0)?,
617 r.get::<_, String>(1)?,
618 r.get::<_, String>(2)?,
619 ))
620 })?
621 .collect::<Result<Vec<_>, _>>()?;
622 Ok(rows)
623}
624
625fn scan_entities_without_description(
629 conn: &Connection,
630 namespace: &str,
631 limit: Option<usize>,
632) -> Result<Vec<(i64, String, String)>, AppError> {
633 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
634 let sql = format!(
635 "SELECT id, name, type
636 FROM entities
637 WHERE namespace = ?1
638 AND (description IS NULL OR description = '')
639 ORDER BY id
640 {limit_clause}"
641 );
642 let mut stmt = conn.prepare(&sql)?;
643 let rows = stmt
644 .query_map(rusqlite::params![namespace], |r| {
645 Ok((
646 r.get::<_, i64>(0)?,
647 r.get::<_, String>(1)?,
648 r.get::<_, String>(2)?,
649 ))
650 })?
651 .collect::<Result<Vec<_>, _>>()?;
652 Ok(rows)
653}
654
655fn scan_short_body_memories(
659 conn: &Connection,
660 namespace: &str,
661 min_chars: usize,
662 limit: Option<usize>,
663) -> Result<Vec<(i64, String, String)>, AppError> {
664 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
665 let sql = format!(
666 "SELECT m.id, m.name, m.body
667 FROM memories m
668 WHERE m.namespace = ?1
669 AND m.deleted_at IS NULL
670 AND LENGTH(COALESCE(m.body,'')) < ?2
671 ORDER BY m.id
672 {limit_clause}"
673 );
674 let mut stmt = conn.prepare(&sql)?;
675 let rows = stmt
676 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
677 Ok((
678 r.get::<_, i64>(0)?,
679 r.get::<_, String>(1)?,
680 r.get::<_, String>(2)?,
681 ))
682 })?
683 .collect::<Result<Vec<_>, _>>()?;
684 Ok(rows)
685}
686
687#[allow(clippy::type_complexity)]
689fn scan_weight_candidates(
690 conn: &Connection,
691 namespace: &str,
692 limit: Option<usize>,
693) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
694 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
695 let sql = format!(
696 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
697 FROM relationships r \
698 JOIN entities e1 ON e1.id = r.source_id \
699 JOIN entities e2 ON e2.id = r.target_id \
700 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
701 ORDER BY r.weight DESC {limit_clause}"
702 );
703 let mut stmt = conn.prepare(&sql)?;
704 let rows = stmt
705 .query_map(rusqlite::params![namespace], |r| {
706 Ok((
707 r.get::<_, i64>(0)?,
708 r.get::<_, String>(1)?,
709 r.get::<_, String>(2)?,
710 r.get::<_, String>(3)?,
711 r.get::<_, f64>(4)?,
712 ))
713 })?
714 .collect::<Result<Vec<_>, _>>()?;
715 Ok(rows)
716}
717
718fn scan_generic_relations(
720 conn: &Connection,
721 namespace: &str,
722 limit: Option<usize>,
723) -> Result<Vec<(i64, String, String, String)>, AppError> {
724 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
725 let sql = format!(
726 "SELECT r.id, e1.name, e2.name, r.relation \
727 FROM relationships r \
728 JOIN entities e1 ON e1.id = r.source_id \
729 JOIN entities e2 ON e2.id = r.target_id \
730 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
731 ORDER BY r.id {limit_clause}"
732 );
733 let mut stmt = conn.prepare(&sql)?;
734 let rows = stmt
735 .query_map(rusqlite::params![namespace], |r| {
736 Ok((
737 r.get::<_, i64>(0)?,
738 r.get::<_, String>(1)?,
739 r.get::<_, String>(2)?,
740 r.get::<_, String>(3)?,
741 ))
742 })?
743 .collect::<Result<Vec<_>, _>>()?;
744 Ok(rows)
745}
746
747fn persist_memory_bindings(
756 conn: &Connection,
757 namespace: &str,
758 memory_id: i64,
759 entities_json: &serde_json::Value,
760 rels_json: &serde_json::Value,
761) -> Result<(usize, usize), AppError> {
762 #[derive(Deserialize)]
763 struct EntityItem {
764 name: String,
765 entity_type: String,
766 }
767 #[derive(Deserialize)]
768 struct RelItem {
769 source: String,
770 target: String,
771 relation: String,
772 strength: f64,
773 }
774
775 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
776 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
777
778 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
779 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
780
781 let mut ent_count = 0usize;
782 let mut rel_count = 0usize;
783
784 for item in &extracted_entities {
785 let entity_type = match item.entity_type.parse::<EntityType>() {
786 Ok(et) => et,
787 Err(_) => {
788 tracing::warn!(
789 target: "enrich",
790 entity = %item.name,
791 entity_type = %item.entity_type,
792 "entity type not recognized, skipping"
793 );
794 continue;
795 }
796 };
797 match entities::upsert_entity(
798 conn,
799 namespace,
800 &NewEntity {
801 name: item.name.clone(),
802 entity_type,
803 description: None,
804 },
805 ) {
806 Ok(eid) => {
807 let _ = entities::link_memory_entity(conn, memory_id, eid);
808 ent_count += 1;
809 }
810 Err(e) => {
811 tracing::warn!(
812 target: "enrich",
813 entity = %item.name,
814 error = %e,
815 "entity upsert skipped"
816 );
817 }
818 }
819 }
820
821 for rel in &extracted_rels {
822 let normalized = crate::parsers::normalize_relation(&rel.relation);
823 crate::parsers::warn_if_non_canonical(&normalized);
824
825 let src_name = crate::parsers::normalize_entity_name(&rel.source);
828 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
829 let src_id = entities::find_entity_id(conn, namespace, &src_name);
830 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
831 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
832 let new_rel = NewRelationship {
833 source: rel.source.clone(),
834 target: rel.target.clone(),
835 relation: normalized,
836 strength: rel.strength,
837 description: None,
838 };
839 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
840 rel_count += 1;
841 }
842 }
843 }
844
845 Ok((ent_count, rel_count))
846}
847
848fn persist_entity_description(
850 conn: &Connection,
851 entity_id: i64,
852 description: &str,
853) -> Result<(), AppError> {
854 conn.execute(
855 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
856 rusqlite::params![description, entity_id],
857 )?;
858 Ok(())
859}
860
861fn persist_enriched_body(
866 conn: &Connection,
867 namespace: &str,
868 memory_id: i64,
869 memory_name: &str,
870 new_body: &str,
871 paths: &crate::paths::AppPaths,
872) -> Result<(), AppError> {
873 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
875 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
876 rusqlite::params![memory_id],
877 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
878 )?;
879
880 let memory_type: String = conn.query_row(
881 "SELECT type FROM memories WHERE id=?1",
882 rusqlite::params![memory_id],
883 |r| r.get(0),
884 )?;
885
886 let description: String = conn.query_row(
887 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
888 rusqlite::params![memory_id],
889 |r| r.get(0),
890 )?;
891
892 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
893
894 let new_memory = memories::NewMemory {
895 namespace: namespace.to_string(),
896 name: memory_name.to_string(),
897 memory_type: memory_type.clone(),
898 description: description.clone(),
899 body: new_body.to_string(),
900 body_hash,
901 session_id: None,
902 source: "enrich".to_string(),
903 metadata: serde_json::Value::Object(serde_json::Map::new()),
904 };
905
906 memories::update(conn, memory_id, &new_memory, None)?;
907 memories::sync_fts_after_update(
908 conn,
909 memory_id,
910 &old_name,
911 &old_desc,
912 &old_body,
913 &new_memory.name,
914 &new_memory.description,
915 &new_memory.body,
916 )?;
917
918 let snippet: String = new_body.chars().take(200).collect();
920 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
921 let chunks_info = crate::chunking::split_into_chunks_hierarchical(new_body, tokenizer);
922 let embedding_result = if chunks_info.len() <= 1 {
923 crate::daemon::embed_passage_or_local(&paths.models, new_body)
924 } else {
925 let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
926 let mut ok = true;
927 for chunk in &chunks_info {
928 let text = crate::chunking::chunk_text(new_body, chunk);
929 match crate::daemon::embed_passage_or_local(&paths.models, text) {
930 Ok(emb) => chunk_embeddings.push(emb),
931 Err(e) => {
932 tracing::warn!(target: "enrich", error = %e, "chunk embedding failed");
933 ok = false;
934 break;
935 }
936 }
937 }
938 if ok {
939 Ok(crate::chunking::aggregate_embeddings(&chunk_embeddings))
940 } else {
941 crate::daemon::embed_passage_or_local(&paths.models, new_body)
942 }
943 };
944
945 if let Ok(embedding) = embedding_result {
946 if let Err(e) = memories::upsert_vec(
947 conn,
948 memory_id,
949 namespace,
950 &memory_type,
951 &embedding,
952 memory_name,
953 &snippet,
954 ) {
955 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
956 }
957 }
958
959 Ok(())
960}
961
962pub fn run(args: &EnrichArgs) -> Result<(), AppError> {
968 let started = Instant::now();
976
977 let paths = AppPaths::resolve(args.db.as_deref())?;
978 ensure_db_ready(&paths)?;
979 let conn = open_rw(&paths.db)?;
980 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
981
982 let provider_binary = match args.mode {
984 EnrichMode::ClaudeCode => {
985 let bin = find_claude_binary(args.claude_binary.as_deref())?;
986 let version = super::claude_runner::validate_claude_version(&bin)?;
987 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
988 emit_json(&PhaseEvent {
989 phase: "validate",
990 binary_path: bin.to_str(),
991 version: Some(&version),
992 items_total: None,
993 items_pending: None,
994 llm_parallelism: None,
995 });
996 bin
997 }
998 EnrichMode::Codex => {
999 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1001 emit_json(&PhaseEvent {
1002 phase: "validate",
1003 binary_path: bin.to_str(),
1004 version: None,
1005 items_total: None,
1006 items_pending: None,
1007 llm_parallelism: None,
1008 });
1009 bin
1010 }
1011 };
1012
1013 let scan_result = scan_operation(&conn, &namespace, args)?;
1015 let total = scan_result.len();
1016
1017 emit_json(&PhaseEvent {
1018 phase: "scan",
1019 binary_path: None,
1020 version: None,
1021 items_total: Some(total),
1022 items_pending: Some(total),
1023 llm_parallelism: Some(args.llm_parallelism),
1024 });
1025
1026 if args.dry_run {
1028 for (idx, key) in scan_result.iter().enumerate() {
1029 emit_json(&ItemEvent {
1030 item: key,
1031 status: "preview",
1032 memory_id: None,
1033 entity_id: None,
1034 entities: None,
1035 rels: None,
1036 chars_before: None,
1037 chars_after: None,
1038 cost_usd: None,
1039 elapsed_ms: None,
1040 error: None,
1041 index: idx,
1042 total,
1043 });
1044 }
1045 emit_json(&EnrichSummary {
1046 summary: true,
1047 operation: format!("{:?}", args.operation),
1048 items_total: total,
1049 completed: 0,
1050 failed: 0,
1051 skipped: 0,
1052 cost_usd: 0.0,
1053 elapsed_ms: started.elapsed().as_millis() as u64,
1054 });
1055 return Ok(());
1056 }
1057
1058 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1062
1063 if args.resume {
1064 let reset = queue_conn
1065 .execute(
1066 "UPDATE queue SET status='pending' WHERE status='processing'",
1067 [],
1068 )
1069 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1070 if reset > 0 {
1071 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1072 }
1073 }
1074
1075 if args.retry_failed {
1076 let count = queue_conn
1077 .execute(
1078 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1079 [],
1080 )
1081 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1082 tracing::info!(target: "enrich", count, "retrying failed items");
1083 }
1084
1085 if !args.resume && !args.retry_failed {
1086 queue_conn
1087 .execute("DELETE FROM queue", [])
1088 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1089 }
1090
1091 for (idx, key) in scan_result.iter().enumerate() {
1093 let item_type = match args.operation {
1094 EnrichOperation::EntityDescriptions => "entity",
1095 _ => "memory",
1096 };
1097 if let Err(e) = queue_conn.execute(
1098 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1099 rusqlite::params![key, item_type],
1100 ) {
1101 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1102 }
1103 let _ = idx; }
1105
1106 let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1109 if parallelism > 1 {
1110 tracing::info!(
1111 target: "enrich",
1112 llm_parallelism = parallelism,
1113 "parallel LLM processing with bounded thread pool"
1114 );
1115 }
1116
1117 let mut completed = 0usize;
1118 let mut failed = 0usize;
1119 let mut skipped = 0usize;
1120 let mut cost_total = 0.0f64;
1121 let mut oauth_detected = false;
1122 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1123 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1124 let enrich_started = std::time::Instant::now();
1125
1126 let provider_timeout = match args.mode {
1127 EnrichMode::ClaudeCode => args.claude_timeout,
1128 EnrichMode::Codex => args.codex_timeout,
1129 };
1130
1131 let provider_model: Option<&str> = match args.mode {
1132 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1133 EnrichMode::Codex => args.codex_model.as_deref(),
1134 };
1135
1136 if parallelism > 1 {
1140 let stdout_mu = parking_lot::Mutex::new(());
1141 let budget = args.max_cost_usd;
1142 let operation = args.operation.clone();
1143 let mode = args.mode.clone();
1144 let min_oc = args.min_output_chars;
1145 let max_oc = args.max_output_chars;
1146 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1147
1148 struct WorkerResult {
1149 completed: usize,
1150 failed: usize,
1151 skipped: usize,
1152 cost: f64,
1153 oauth: bool,
1154 }
1155
1156 let results: Vec<WorkerResult> = std::thread::scope(|s| {
1157 let handles: Vec<_> = (0..parallelism)
1158 .map(|worker_id| {
1159 let stdout_mu = &stdout_mu;
1160 let paths = &paths;
1161 let namespace = &namespace;
1162 let provider_binary = &provider_binary;
1163 let operation = &operation;
1164 let mode = &mode;
1165 let prompt_tpl = prompt_tpl.as_deref();
1166 s.spawn(move || {
1167 let w_conn = match open_rw(&paths.db) {
1168 Ok(c) => c,
1169 Err(e) => {
1170 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1171 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1172 }
1173 };
1174 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1175 Ok(c) => c,
1176 Err(e) => {
1177 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1178 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1179 }
1180 };
1181 let mut w_completed = 0usize;
1182 let mut w_failed = 0usize;
1183 let mut w_skipped = 0usize;
1184 let mut w_cost = 0.0f64;
1185 let mut w_oauth = false;
1186 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1187 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1188
1189 loop {
1190 if crate::shutdown_requested() {
1191 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1192 break;
1193 }
1194 if let Some(b) = budget {
1195 if !w_oauth && w_cost >= b {
1196 break;
1197 }
1198 }
1199 let pending: Option<(i64, String, String)> = w_queue
1200 .query_row(
1201 "UPDATE queue SET status='processing', attempt=attempt+1 \
1202 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1203 RETURNING id, item_key, item_type",
1204 [],
1205 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1206 )
1207 .ok();
1208 let (queue_id, item_key, _item_type) = match pending {
1209 Some(p) => p,
1210 None => break,
1211 };
1212 let item_started = Instant::now();
1213 let current_index = w_completed + w_failed + w_skipped;
1214
1215 let call_result = match operation {
1216 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1217 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1218 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, paths),
1219 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1220 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1221 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1222 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1223 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1224 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1225 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1226 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1227 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1228 };
1229
1230 match call_result {
1231 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1232 if is_oauth { w_oauth = true; }
1233 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1234 let _ = w_queue.execute(
1235 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1236 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1237 );
1238 w_completed += 1;
1239 if !is_oauth { w_cost += cost; }
1240 let _guard = stdout_mu.lock();
1241 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1242 }
1243 Ok(EnrichItemResult::Skipped { reason }) => {
1244 w_skipped += 1;
1245 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1246 let _guard = stdout_mu.lock();
1247 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1248 }
1249 Err(e) => {
1250 let err_str = format!("{e}");
1251 if matches!(e, AppError::RateLimited { .. }) {
1252 if crate::retry::is_kill_switch_active() {
1253 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1254 } else if std::time::Instant::now() >= w_deadline {
1255 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1256 } else {
1257 let half = w_backoff / 2;
1258 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1259 let actual_wait = half + jitter;
1260 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1261 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1262 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1263 w_backoff = (w_backoff * 2).min(900);
1264 continue;
1265 }
1266 }
1267 w_failed += 1;
1268 let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1269 let _guard = stdout_mu.lock();
1270 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1271 }
1272 }
1273 }
1274 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1275 })
1276 })
1277 .collect();
1278 handles
1279 .into_iter()
1280 .map(|h| {
1281 h.join().unwrap_or(WorkerResult {
1282 completed: 0,
1283 failed: 0,
1284 skipped: 0,
1285 cost: 0.0,
1286 oauth: false,
1287 })
1288 })
1289 .collect()
1290 });
1291
1292 for r in &results {
1293 completed += r.completed;
1294 failed += r.failed;
1295 skipped += r.skipped;
1296 cost_total += r.cost;
1297 oauth_detected |= r.oauth;
1298 }
1299 } else {
1300 loop {
1302 if crate::shutdown_requested() {
1303 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
1304 break;
1305 }
1306
1307 if let Some(budget) = args.max_cost_usd {
1309 if !oauth_detected && cost_total >= budget {
1310 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
1311 break;
1312 }
1313 }
1314
1315 let pending: Option<(i64, String, String)> = queue_conn
1317 .query_row(
1318 "UPDATE queue SET status='processing', attempt=attempt+1 \
1319 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1320 RETURNING id, item_key, item_type",
1321 [],
1322 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1323 )
1324 .ok();
1325
1326 let (queue_id, item_key, item_type) = match pending {
1327 Some(p) => p,
1328 None => break,
1329 };
1330
1331 let item_started = Instant::now();
1332 let current_index = completed + failed + skipped;
1333
1334 let call_result = match args.operation {
1335 EnrichOperation::MemoryBindings => call_memory_bindings(
1336 &conn,
1337 &namespace,
1338 &item_key,
1339 &provider_binary,
1340 provider_model,
1341 provider_timeout,
1342 &args.mode,
1343 ),
1344 EnrichOperation::EntityDescriptions => call_entity_description(
1345 &conn,
1346 &namespace,
1347 &item_key,
1348 &provider_binary,
1349 provider_model,
1350 provider_timeout,
1351 &args.mode,
1352 ),
1353 EnrichOperation::BodyEnrich => call_body_enrich(
1354 &conn,
1355 &namespace,
1356 &item_key,
1357 &provider_binary,
1358 provider_model,
1359 provider_timeout,
1360 &args.mode,
1361 args.min_output_chars,
1362 args.max_output_chars,
1363 args.prompt_template.as_deref(),
1364 &paths,
1365 ),
1366 EnrichOperation::WeightCalibrate => call_weight_calibrate(
1367 &conn,
1368 &namespace,
1369 &item_key,
1370 &provider_binary,
1371 provider_model,
1372 provider_timeout,
1373 &args.mode,
1374 ),
1375 EnrichOperation::RelationReclassify => call_relation_reclassify(
1376 &conn,
1377 &namespace,
1378 &item_key,
1379 &provider_binary,
1380 provider_model,
1381 provider_timeout,
1382 &args.mode,
1383 ),
1384 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
1385 call_entity_connect(
1386 &conn,
1387 &namespace,
1388 &item_key,
1389 &provider_binary,
1390 provider_model,
1391 provider_timeout,
1392 &args.mode,
1393 )
1394 }
1395 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
1396 &conn,
1397 &namespace,
1398 &item_key,
1399 &provider_binary,
1400 provider_model,
1401 provider_timeout,
1402 &args.mode,
1403 ),
1404 EnrichOperation::DescriptionEnrich => call_description_enrich(
1405 &conn,
1406 &namespace,
1407 &item_key,
1408 &provider_binary,
1409 provider_model,
1410 provider_timeout,
1411 &args.mode,
1412 ),
1413 EnrichOperation::DomainClassify => call_domain_classify(
1414 &conn,
1415 &namespace,
1416 &item_key,
1417 &provider_binary,
1418 provider_model,
1419 provider_timeout,
1420 &args.mode,
1421 ),
1422 EnrichOperation::GraphAudit => call_graph_audit(
1423 &conn,
1424 &namespace,
1425 &item_key,
1426 &provider_binary,
1427 provider_model,
1428 provider_timeout,
1429 &args.mode,
1430 ),
1431 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
1432 &conn,
1433 &namespace,
1434 &item_key,
1435 &provider_binary,
1436 provider_model,
1437 provider_timeout,
1438 &args.mode,
1439 ),
1440 EnrichOperation::BodyExtract => call_body_extract(
1441 &conn,
1442 &namespace,
1443 &item_key,
1444 &provider_binary,
1445 provider_model,
1446 provider_timeout,
1447 &args.mode,
1448 ),
1449 };
1450
1451 match call_result {
1452 Ok(EnrichItemResult::Done {
1453 memory_id,
1454 entity_id,
1455 entities,
1456 rels,
1457 chars_before,
1458 chars_after,
1459 cost,
1460 is_oauth,
1461 }) => {
1462 if is_oauth && !oauth_detected {
1463 oauth_detected = true;
1464 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
1465 }
1466 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1467
1468 let persist_err: Option<String> = match args.operation {
1470 EnrichOperation::MemoryBindings => {
1471 None
1473 }
1474 EnrichOperation::EntityDescriptions => {
1475 None
1477 }
1478 EnrichOperation::BodyEnrich => {
1479 None
1481 }
1482 _ => {
1483 None
1485 }
1486 };
1487
1488 if let Err(e) = queue_conn.execute(
1489 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1490 rusqlite::params![
1491 memory_id,
1492 entity_id,
1493 entities as i64,
1494 rels as i64,
1495 cost,
1496 item_started.elapsed().as_millis() as i64,
1497 queue_id
1498 ],
1499 ) {
1500 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
1501 }
1502
1503 if persist_err.is_none() {
1504 completed += 1;
1505 if !is_oauth {
1506 cost_total += cost;
1507 }
1508 emit_json(&ItemEvent {
1509 item: &item_key,
1510 status: "done",
1511 memory_id,
1512 entity_id,
1513 entities: Some(entities),
1514 rels: Some(rels),
1515 chars_before,
1516 chars_after,
1517 cost_usd: if is_oauth { None } else { Some(cost) },
1518 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1519 error: None,
1520 index: current_index,
1521 total,
1522 });
1523 } else {
1524 failed += 1;
1525 emit_json(&ItemEvent {
1526 item: &item_key,
1527 status: "failed",
1528 memory_id: None,
1529 entity_id: None,
1530 entities: None,
1531 rels: None,
1532 chars_before: None,
1533 chars_after: None,
1534 cost_usd: None,
1535 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1536 error: persist_err,
1537 index: current_index,
1538 total,
1539 });
1540 }
1541 }
1542 Ok(EnrichItemResult::Skipped { reason }) => {
1543 skipped += 1;
1544 if let Err(e) = queue_conn.execute(
1545 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1546 rusqlite::params![reason, queue_id],
1547 ) {
1548 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
1549 }
1550 emit_json(&ItemEvent {
1551 item: &item_key,
1552 status: "skipped",
1553 memory_id: None,
1554 entity_id: None,
1555 entities: None,
1556 rels: None,
1557 chars_before: None,
1558 chars_after: None,
1559 cost_usd: None,
1560 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1561 error: None,
1562 index: current_index,
1563 total,
1564 });
1565 }
1566 Err(e) => {
1567 let err_str = format!("{e}");
1568 if matches!(e, AppError::RateLimited { .. }) {
1569 if crate::retry::is_kill_switch_active() {
1570 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1571 } else if std::time::Instant::now() >= rate_limit_deadline {
1572 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
1573 } else {
1574 let half = backoff_secs / 2;
1575 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1576 let actual_wait = half + jitter;
1577 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
1578 if let Err(qe) = queue_conn.execute(
1579 "UPDATE queue SET status='pending' WHERE id=?1",
1580 rusqlite::params![queue_id],
1581 ) {
1582 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
1583 }
1584 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1585 backoff_secs = (backoff_secs * 2).min(900);
1586 continue;
1587 }
1588 }
1589
1590 failed += 1;
1591 if let Err(qe) = queue_conn.execute(
1592 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1593 rusqlite::params![err_str, queue_id],
1594 ) {
1595 tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
1596 }
1597 emit_json(&ItemEvent {
1598 item: &item_key,
1599 status: "failed",
1600 memory_id: None,
1601 entity_id: None,
1602 entities: None,
1603 rels: None,
1604 chars_before: None,
1605 chars_after: None,
1606 cost_usd: None,
1607 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1608 error: Some(err_str),
1609 index: current_index,
1610 total,
1611 });
1612 }
1613 }
1614
1615 let _ = item_type; }
1617 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1620 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1621
1622 emit_json(&EnrichSummary {
1623 summary: true,
1624 operation: format!("{:?}", args.operation),
1625 items_total: total,
1626 completed,
1627 failed,
1628 skipped,
1629 cost_usd: cost_total,
1630 elapsed_ms: started.elapsed().as_millis() as u64,
1631 });
1632
1633 if failed == 0 {
1634 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
1635 }
1636
1637 Ok(())
1638}
1639
1640enum EnrichItemResult {
1645 Done {
1646 memory_id: Option<i64>,
1647 entity_id: Option<i64>,
1648 entities: usize,
1649 rels: usize,
1650 chars_before: Option<usize>,
1651 chars_after: Option<usize>,
1652 cost: f64,
1653 is_oauth: bool,
1654 },
1655 Skipped {
1656 reason: String,
1657 },
1658}
1659
1660fn call_memory_bindings(
1665 conn: &Connection,
1666 namespace: &str,
1667 memory_name: &str,
1668 binary: &Path,
1669 model: Option<&str>,
1670 timeout: u64,
1671 mode: &EnrichMode,
1672) -> Result<EnrichItemResult, AppError> {
1673 let (memory_id, body): (i64, String) = conn.query_row(
1675 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
1676 rusqlite::params![namespace, memory_name],
1677 |r| Ok((r.get(0)?, r.get(1)?)),
1678 ).map_err(|e| match e {
1679 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
1680 other => AppError::Database(other),
1681 })?;
1682
1683 if body.trim().is_empty() {
1684 return Ok(EnrichItemResult::Skipped {
1685 reason: "body is empty".to_string(),
1686 });
1687 }
1688
1689 let (value, cost, is_oauth) = match mode {
1690 EnrichMode::ClaudeCode => call_claude(
1691 binary,
1692 BINDINGS_PROMPT,
1693 BINDINGS_SCHEMA,
1694 &body,
1695 model,
1696 timeout,
1697 )?,
1698 EnrichMode::Codex => call_codex(
1699 binary,
1700 BINDINGS_PROMPT,
1701 BINDINGS_SCHEMA,
1702 &body,
1703 model,
1704 timeout,
1705 )?,
1706 };
1707
1708 let empty_arr = serde_json::Value::Array(vec![]);
1709 let entities_val = value.get("entities").unwrap_or(&empty_arr);
1710 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
1711
1712 let (ent_count, rel_count) =
1713 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
1714
1715 Ok(EnrichItemResult::Done {
1716 memory_id: Some(memory_id),
1717 entity_id: None,
1718 entities: ent_count,
1719 rels: rel_count,
1720 chars_before: None,
1721 chars_after: None,
1722 cost,
1723 is_oauth,
1724 })
1725}
1726
1727fn call_entity_description(
1728 conn: &Connection,
1729 namespace: &str,
1730 entity_name: &str,
1731 binary: &Path,
1732 model: Option<&str>,
1733 timeout: u64,
1734 mode: &EnrichMode,
1735) -> Result<EnrichItemResult, AppError> {
1736 let (entity_id, entity_type): (i64, String) = conn
1737 .query_row(
1738 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
1739 rusqlite::params![namespace, entity_name],
1740 |r| Ok((r.get(0)?, r.get(1)?)),
1741 )
1742 .map_err(|e| match e {
1743 rusqlite::Error::QueryReturnedNoRows => {
1744 AppError::NotFound(format!("entity '{entity_name}' not found"))
1745 }
1746 other => AppError::Database(other),
1747 })?;
1748
1749 let prompt = format!(
1750 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
1751 );
1752
1753 let (value, cost, is_oauth) = match mode {
1754 EnrichMode::ClaudeCode => call_claude(
1755 binary,
1756 &prompt,
1757 ENTITY_DESCRIPTION_SCHEMA,
1758 "",
1759 model,
1760 timeout,
1761 )?,
1762 EnrichMode::Codex => call_codex(
1763 binary,
1764 &prompt,
1765 ENTITY_DESCRIPTION_SCHEMA,
1766 "",
1767 model,
1768 timeout,
1769 )?,
1770 };
1771
1772 let description = value
1773 .get("description")
1774 .and_then(|v| v.as_str())
1775 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
1776
1777 persist_entity_description(conn, entity_id, description)?;
1778
1779 Ok(EnrichItemResult::Done {
1780 memory_id: None,
1781 entity_id: Some(entity_id),
1782 entities: 0,
1783 rels: 0,
1784 chars_before: None,
1785 chars_after: None,
1786 cost,
1787 is_oauth,
1788 })
1789}
1790
1791#[allow(clippy::too_many_arguments)]
1792fn call_body_enrich(
1793 conn: &Connection,
1794 namespace: &str,
1795 memory_name: &str,
1796 binary: &Path,
1797 model: Option<&str>,
1798 timeout: u64,
1799 mode: &EnrichMode,
1800 min_output_chars: usize,
1801 max_output_chars: usize,
1802 prompt_template: Option<&Path>,
1803 paths: &crate::paths::AppPaths,
1804) -> Result<EnrichItemResult, AppError> {
1805 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
1806 .query_row(
1807 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
1808 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
1809 rusqlite::params![namespace, memory_name],
1810 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
1811 )
1812 .map_err(|e| match e {
1813 rusqlite::Error::QueryReturnedNoRows => {
1814 AppError::NotFound(format!("memory '{memory_name}' not found"))
1815 }
1816 other => AppError::Database(other),
1817 })?;
1818
1819 let chars_before = body.chars().count();
1820
1821 let linked_entities: Vec<String> = {
1823 let mut stmt = conn.prepare_cached(
1824 "SELECT e.name FROM memory_entities me \
1825 JOIN entities e ON e.id = me.entity_id \
1826 WHERE me.memory_id = ?1 LIMIT 10",
1827 )?;
1828 let result: Vec<String> = stmt
1829 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
1830 .filter_map(|r| r.ok())
1831 .collect();
1832 drop(stmt);
1833 result
1834 };
1835
1836 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
1838 let file_size = std::fs::metadata(tmpl_path)
1839 .map_err(|e| {
1840 AppError::Io(std::io::Error::new(
1841 e.kind(),
1842 format!("failed to stat prompt template: {e}"),
1843 ))
1844 })?
1845 .len();
1846 if file_size > MAX_MEMORY_BODY_LEN as u64 {
1847 return Err(AppError::LimitExceeded(
1848 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
1849 ));
1850 }
1851 std::fs::read_to_string(tmpl_path).map_err(|e| {
1852 AppError::Io(std::io::Error::new(
1853 e.kind(),
1854 format!("failed to read prompt template: {e}"),
1855 ))
1856 })?
1857 } else {
1858 BODY_ENRICH_PROMPT_PREFIX.to_string()
1859 };
1860
1861 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
1863 let mut ctx = String::new();
1864 ctx.push_str(&format!(
1865 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
1866 ));
1867 if !description.is_empty() {
1868 ctx.push_str(&format!("- Description: {description}\n"));
1869 }
1870 ctx.push_str(&format!("- Domain: {namespace}\n"));
1871 if !linked_entities.is_empty() {
1872 ctx.push_str(&format!(
1873 "- Linked entities: {}\n",
1874 linked_entities.join(", ")
1875 ));
1876 }
1877 ctx
1878 } else {
1879 String::new()
1880 };
1881
1882 let prompt = format!(
1883 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
1884 );
1885
1886 let (value, cost, is_oauth) = match mode {
1888 EnrichMode::ClaudeCode => {
1889 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
1890 }
1891 EnrichMode::Codex => {
1892 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
1893 }
1894 };
1895
1896 let enriched_body = value
1897 .get("enriched_body")
1898 .and_then(|v| v.as_str())
1899 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
1900
1901 let chars_after = enriched_body.chars().count();
1902
1903 if chars_after <= chars_before {
1905 return Ok(EnrichItemResult::Skipped {
1906 reason: format!(
1907 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
1908 ),
1909 });
1910 }
1911
1912 persist_enriched_body(
1913 conn,
1914 namespace,
1915 memory_id,
1916 memory_name,
1917 enriched_body,
1918 paths,
1919 )?;
1920
1921 Ok(EnrichItemResult::Done {
1922 memory_id: Some(memory_id),
1923 entity_id: None,
1924 entities: 0,
1925 rels: 0,
1926 chars_before: Some(chars_before),
1927 chars_after: Some(chars_after),
1928 cost,
1929 is_oauth,
1930 })
1931}
1932
1933fn scan_operation(
1938 conn: &Connection,
1939 namespace: &str,
1940 args: &EnrichArgs,
1941) -> Result<Vec<String>, AppError> {
1942 match args.operation {
1943 EnrichOperation::MemoryBindings => {
1944 let rows = scan_unbound_memories(conn, namespace, args.limit)?;
1945 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1946 }
1947 EnrichOperation::EntityDescriptions => {
1948 let rows = scan_entities_without_description(conn, namespace, args.limit)?;
1949 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1950 }
1951 EnrichOperation::BodyEnrich => {
1952 let rows =
1953 scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
1954 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1955 }
1956 EnrichOperation::WeightCalibrate => {
1957 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
1958 Ok(rows
1959 .into_iter()
1960 .map(|(id, _, _, _, _)| id.to_string())
1961 .collect())
1962 }
1963 EnrichOperation::RelationReclassify => {
1964 let rows = scan_generic_relations(conn, namespace, args.limit)?;
1965 Ok(rows
1966 .into_iter()
1967 .map(|(id, _, _, _)| id.to_string())
1968 .collect())
1969 }
1970 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
1971 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
1972 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
1973 }
1974 EnrichOperation::EntityTypeValidate => {
1975 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
1976 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1977 }
1978 EnrichOperation::DescriptionEnrich => {
1979 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
1980 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
1981 }
1982 EnrichOperation::DomainClassify
1983 | EnrichOperation::GraphAudit
1984 | EnrichOperation::DeepResearchSynth
1985 | EnrichOperation::BodyExtract => {
1986 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1987 let sql = format!(
1988 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
1989 );
1990 let mut stmt = conn.prepare(&sql)?;
1991 let names = stmt
1992 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
1993 .collect::<Result<Vec<_>, _>>()?;
1994 Ok(names)
1995 }
1996 }
1997}
1998
1999fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2005 if let Some(p) = explicit {
2006 if p.exists() {
2007 return Ok(p.to_path_buf());
2008 }
2009 return Err(AppError::Validation(format!(
2010 "Codex binary not found at explicit path: {}",
2011 p.display()
2012 )));
2013 }
2014
2015 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2016 let p = PathBuf::from(&env_path);
2017 if p.exists() {
2018 return Ok(p);
2019 }
2020 }
2021
2022 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2023 if let Some(path_var) = std::env::var_os("PATH") {
2024 for dir in std::env::split_paths(&path_var) {
2025 let candidate = dir.join(name);
2026 if candidate.exists() {
2027 return Ok(candidate);
2028 }
2029 }
2030 }
2031
2032 Err(AppError::Validation(
2033 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2034 ))
2035}
2036
2037fn call_weight_calibrate(
2039 conn: &Connection,
2040 _namespace: &str,
2041 item_key: &str,
2042 binary: &Path,
2043 model: Option<&str>,
2044 timeout: u64,
2045 mode: &EnrichMode,
2046) -> Result<EnrichItemResult, AppError> {
2047 let rel_id: i64 = item_key
2048 .parse()
2049 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2050 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2051 .query_row(
2052 "SELECT e1.name, e2.name, r.relation, r.weight \
2053 FROM relationships r \
2054 JOIN entities e1 ON e1.id = r.source_id \
2055 JOIN entities e2 ON e2.id = r.target_id \
2056 WHERE r.id = ?1",
2057 rusqlite::params![rel_id],
2058 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2059 )
2060 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2061
2062 let input_text = format!(
2063 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2064 );
2065 let (value, cost, is_oauth) = match mode {
2066 EnrichMode::ClaudeCode => call_claude(
2067 binary,
2068 WEIGHT_CALIBRATE_PROMPT,
2069 WEIGHT_CALIBRATE_SCHEMA,
2070 &input_text,
2071 model,
2072 timeout,
2073 )?,
2074 EnrichMode::Codex => call_codex(
2075 binary,
2076 WEIGHT_CALIBRATE_PROMPT,
2077 WEIGHT_CALIBRATE_SCHEMA,
2078 &input_text,
2079 model,
2080 timeout,
2081 )?,
2082 };
2083
2084 let calibrated = value
2085 .get("calibrated_weight")
2086 .and_then(|v| v.as_f64())
2087 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2088
2089 conn.execute(
2090 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2091 rusqlite::params![calibrated, rel_id],
2092 )?;
2093
2094 Ok(EnrichItemResult::Done {
2095 memory_id: None,
2096 entity_id: None,
2097 entities: 0,
2098 rels: 1,
2099 chars_before: None,
2100 chars_after: None,
2101 cost,
2102 is_oauth,
2103 })
2104}
2105
2106fn call_relation_reclassify(
2108 conn: &Connection,
2109 _namespace: &str,
2110 item_key: &str,
2111 binary: &Path,
2112 model: Option<&str>,
2113 timeout: u64,
2114 mode: &EnrichMode,
2115) -> Result<EnrichItemResult, AppError> {
2116 let rel_id: i64 = item_key
2117 .parse()
2118 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2119 let (source_name, target_name, current_relation): (String, String, String) = conn
2120 .query_row(
2121 "SELECT e1.name, e2.name, r.relation \
2122 FROM relationships r \
2123 JOIN entities e1 ON e1.id = r.source_id \
2124 JOIN entities e2 ON e2.id = r.target_id \
2125 WHERE r.id = ?1",
2126 rusqlite::params![rel_id],
2127 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2128 )
2129 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2130
2131 let input_text = format!(
2132 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
2133 );
2134 let (value, cost, is_oauth) = match mode {
2135 EnrichMode::ClaudeCode => call_claude(
2136 binary,
2137 RELATION_RECLASSIFY_PROMPT,
2138 RELATION_RECLASSIFY_SCHEMA,
2139 &input_text,
2140 model,
2141 timeout,
2142 )?,
2143 EnrichMode::Codex => call_codex(
2144 binary,
2145 RELATION_RECLASSIFY_PROMPT,
2146 RELATION_RECLASSIFY_SCHEMA,
2147 &input_text,
2148 model,
2149 timeout,
2150 )?,
2151 };
2152
2153 let new_relation = value
2154 .get("relation")
2155 .and_then(|v| v.as_str())
2156 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
2157 let new_strength = value
2158 .get("strength")
2159 .and_then(|v| v.as_f64())
2160 .unwrap_or(0.5);
2161
2162 conn.execute(
2163 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
2164 rusqlite::params![new_relation, new_strength, rel_id],
2165 )?;
2166
2167 Ok(EnrichItemResult::Done {
2168 memory_id: None,
2169 entity_id: None,
2170 entities: 0,
2171 rels: 1,
2172 chars_before: None,
2173 chars_after: None,
2174 cost,
2175 is_oauth,
2176 })
2177}
2178
2179fn call_entity_connect(
2181 conn: &Connection,
2182 namespace: &str,
2183 item_key: &str,
2184 binary: &Path,
2185 model: Option<&str>,
2186 timeout: u64,
2187 mode: &EnrichMode,
2188) -> Result<EnrichItemResult, AppError> {
2189 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
2190 let (e1_id, e1_name, e2_id, e2_name) =
2191 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
2192 Some(p) => p,
2193 None => {
2194 return Ok(EnrichItemResult::Skipped {
2195 reason: "pair no longer isolated".into(),
2196 })
2197 }
2198 };
2199 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
2200 let (value, cost, is_oauth) = match mode {
2201 EnrichMode::ClaudeCode => call_claude(
2202 binary,
2203 ENTITY_CONNECT_PROMPT,
2204 ENTITY_CONNECT_SCHEMA,
2205 &input_text,
2206 model,
2207 timeout,
2208 )?,
2209 EnrichMode::Codex => call_codex(
2210 binary,
2211 ENTITY_CONNECT_PROMPT,
2212 ENTITY_CONNECT_SCHEMA,
2213 &input_text,
2214 model,
2215 timeout,
2216 )?,
2217 };
2218 let relation = value
2219 .get("relation")
2220 .and_then(|v| v.as_str())
2221 .unwrap_or("none");
2222 if relation == "none" {
2223 return Ok(EnrichItemResult::Skipped {
2224 reason: "LLM determined no relationship".into(),
2225 });
2226 }
2227 let strength = value
2228 .get("strength")
2229 .and_then(|v| v.as_f64())
2230 .unwrap_or(0.5);
2231 conn.execute(
2232 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
2233 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
2234 )?;
2235 Ok(EnrichItemResult::Done {
2236 memory_id: None,
2237 entity_id: None,
2238 entities: 0,
2239 rels: 1,
2240 chars_before: None,
2241 chars_after: None,
2242 cost,
2243 is_oauth,
2244 })
2245}
2246
2247fn call_entity_type_validate(
2249 conn: &Connection,
2250 _namespace: &str,
2251 item_key: &str,
2252 binary: &Path,
2253 model: Option<&str>,
2254 timeout: u64,
2255 mode: &EnrichMode,
2256) -> Result<EnrichItemResult, AppError> {
2257 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
2258 .query_row(
2259 "SELECT id, name, type FROM entities WHERE name = ?1",
2260 rusqlite::params![item_key],
2261 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2262 )
2263 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
2264 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
2265 let (value, cost, is_oauth) = match mode {
2266 EnrichMode::ClaudeCode => call_claude(
2267 binary,
2268 ENTITY_TYPE_VALIDATE_PROMPT,
2269 ENTITY_TYPE_VALIDATE_SCHEMA,
2270 &input_text,
2271 model,
2272 timeout,
2273 )?,
2274 EnrichMode::Codex => call_codex(
2275 binary,
2276 ENTITY_TYPE_VALIDATE_PROMPT,
2277 ENTITY_TYPE_VALIDATE_SCHEMA,
2278 &input_text,
2279 model,
2280 timeout,
2281 )?,
2282 };
2283 let validated_type = value
2284 .get("validated_type")
2285 .and_then(|v| v.as_str())
2286 .unwrap_or(&ent_type);
2287 let was_correct = value
2288 .get("was_correct")
2289 .and_then(|v| v.as_bool())
2290 .unwrap_or(true);
2291 if !was_correct {
2292 conn.execute(
2293 "UPDATE entities SET type = ?1 WHERE id = ?2",
2294 rusqlite::params![validated_type, ent_id],
2295 )?;
2296 }
2297 Ok(EnrichItemResult::Done {
2298 memory_id: None,
2299 entity_id: Some(ent_id),
2300 entities: 1,
2301 rels: 0,
2302 chars_before: None,
2303 chars_after: None,
2304 cost,
2305 is_oauth,
2306 })
2307}
2308
2309fn call_description_enrich(
2311 conn: &Connection,
2312 _namespace: &str,
2313 item_key: &str,
2314 binary: &Path,
2315 model: Option<&str>,
2316 timeout: u64,
2317 mode: &EnrichMode,
2318) -> Result<EnrichItemResult, AppError> {
2319 let (mem_id, body, old_desc): (i64, String, String) = conn
2320 .query_row(
2321 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2322 rusqlite::params![item_key],
2323 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
2324 )
2325 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2326 let snippet: String = body.chars().take(500).collect();
2327 let input_text = format!(
2328 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
2329 );
2330 let (value, cost, is_oauth) = match mode {
2331 EnrichMode::ClaudeCode => call_claude(
2332 binary,
2333 DESCRIPTION_ENRICH_PROMPT,
2334 DESCRIPTION_ENRICH_SCHEMA,
2335 &input_text,
2336 model,
2337 timeout,
2338 )?,
2339 EnrichMode::Codex => call_codex(
2340 binary,
2341 DESCRIPTION_ENRICH_PROMPT,
2342 DESCRIPTION_ENRICH_SCHEMA,
2343 &input_text,
2344 model,
2345 timeout,
2346 )?,
2347 };
2348 let new_desc = value
2349 .get("description")
2350 .and_then(|v| v.as_str())
2351 .unwrap_or(&old_desc);
2352 conn.execute(
2353 "UPDATE memories SET description = ?1 WHERE id = ?2",
2354 rusqlite::params![new_desc, mem_id],
2355 )?;
2356 Ok(EnrichItemResult::Done {
2357 memory_id: Some(mem_id),
2358 entity_id: None,
2359 entities: 0,
2360 rels: 0,
2361 chars_before: Some(old_desc.len()),
2362 chars_after: Some(new_desc.len()),
2363 cost,
2364 is_oauth,
2365 })
2366}
2367
2368fn call_domain_classify(
2370 conn: &Connection,
2371 _namespace: &str,
2372 item_key: &str,
2373 binary: &Path,
2374 model: Option<&str>,
2375 timeout: u64,
2376 mode: &EnrichMode,
2377) -> Result<EnrichItemResult, AppError> {
2378 let (mem_id, body, desc): (i64, String, String) = conn
2379 .query_row(
2380 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2381 rusqlite::params![item_key],
2382 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
2383 )
2384 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2385 let snippet: String = body.chars().take(500).collect();
2386 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
2387 let (value, cost, is_oauth) = match mode {
2388 EnrichMode::ClaudeCode => call_claude(
2389 binary,
2390 DOMAIN_CLASSIFY_PROMPT,
2391 DOMAIN_CLASSIFY_SCHEMA,
2392 &input_text,
2393 model,
2394 timeout,
2395 )?,
2396 EnrichMode::Codex => call_codex(
2397 binary,
2398 DOMAIN_CLASSIFY_PROMPT,
2399 DOMAIN_CLASSIFY_SCHEMA,
2400 &input_text,
2401 model,
2402 timeout,
2403 )?,
2404 };
2405 let domain = value
2406 .get("domain")
2407 .and_then(|v| v.as_str())
2408 .unwrap_or("uncategorized");
2409 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
2410 conn.execute(
2411 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
2412 rusqlite::params![metadata, mem_id],
2413 )?;
2414 Ok(EnrichItemResult::Done {
2415 memory_id: Some(mem_id),
2416 entity_id: None,
2417 entities: 0,
2418 rels: 0,
2419 chars_before: None,
2420 chars_after: None,
2421 cost,
2422 is_oauth,
2423 })
2424}
2425
2426fn call_graph_audit(
2428 conn: &Connection,
2429 _namespace: &str,
2430 item_key: &str,
2431 binary: &Path,
2432 model: Option<&str>,
2433 timeout: u64,
2434 mode: &EnrichMode,
2435) -> Result<EnrichItemResult, AppError> {
2436 let (mem_id, body, desc): (i64, String, String) = conn
2437 .query_row(
2438 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2439 rusqlite::params![item_key],
2440 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
2441 )
2442 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2443 let snippet: String = body.chars().take(500).collect();
2444 let ent_count: i64 = conn
2445 .query_row(
2446 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
2447 rusqlite::params![mem_id],
2448 |r| r.get(0),
2449 )
2450 .unwrap_or(0);
2451 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
2452 let (value, cost, is_oauth) = match mode {
2453 EnrichMode::ClaudeCode => call_claude(
2454 binary,
2455 GRAPH_AUDIT_PROMPT,
2456 GRAPH_AUDIT_SCHEMA,
2457 &input_text,
2458 model,
2459 timeout,
2460 )?,
2461 EnrichMode::Codex => call_codex(
2462 binary,
2463 GRAPH_AUDIT_PROMPT,
2464 GRAPH_AUDIT_SCHEMA,
2465 &input_text,
2466 model,
2467 timeout,
2468 )?,
2469 };
2470 let issues = value
2471 .get("issues")
2472 .and_then(|v| v.as_array())
2473 .map(|a| a.len())
2474 .unwrap_or(0);
2475 Ok(EnrichItemResult::Done {
2476 memory_id: Some(mem_id),
2477 entity_id: None,
2478 entities: 0,
2479 rels: issues,
2480 chars_before: None,
2481 chars_after: None,
2482 cost,
2483 is_oauth,
2484 })
2485}
2486
2487fn call_deep_research_synth(
2489 conn: &Connection,
2490 namespace: &str,
2491 item_key: &str,
2492 binary: &Path,
2493 model: Option<&str>,
2494 timeout: u64,
2495 mode: &EnrichMode,
2496) -> Result<EnrichItemResult, AppError> {
2497 let (mem_id, body): (i64, String) = conn
2498 .query_row(
2499 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2500 rusqlite::params![item_key],
2501 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
2502 )
2503 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2504 let snippet: String = body.chars().take(2000).collect();
2505 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
2506 let (value, cost, is_oauth) = match mode {
2507 EnrichMode::ClaudeCode => call_claude(
2508 binary,
2509 DEEP_RESEARCH_SYNTH_PROMPT,
2510 DEEP_RESEARCH_SYNTH_SCHEMA,
2511 &input_text,
2512 model,
2513 timeout,
2514 )?,
2515 EnrichMode::Codex => call_codex(
2516 binary,
2517 DEEP_RESEARCH_SYNTH_PROMPT,
2518 DEEP_RESEARCH_SYNTH_SCHEMA,
2519 &input_text,
2520 model,
2521 timeout,
2522 )?,
2523 };
2524 let mut ent_count = 0usize;
2525 let mut rel_count = 0usize;
2526 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
2527 for e in ents {
2528 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
2529 let etype_str = e
2530 .get("entity_type")
2531 .and_then(|v| v.as_str())
2532 .unwrap_or("concept");
2533 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
2534 if name.len() >= 2 {
2535 let ne = NewEntity {
2536 name: name.to_string(),
2537 entity_type: etype,
2538 description: None,
2539 };
2540 let _ = entities::upsert_entity(conn, namespace, &ne);
2541 ent_count += 1;
2542 }
2543 }
2544 }
2545 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
2546 for r in rels {
2547 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
2548 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
2549 if src.is_empty() || tgt.is_empty() {
2550 continue;
2551 }
2552 let rel = r
2553 .get("relation")
2554 .and_then(|v| v.as_str())
2555 .unwrap_or("related");
2556 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
2557 if let (Some(sid), Some(tid)) = (
2558 entities::find_entity_id(conn, namespace, src)?,
2559 entities::find_entity_id(conn, namespace, tgt)?,
2560 ) {
2561 let _ = entities::create_or_fetch_relationship(
2562 conn, namespace, sid, tid, rel, str_, None,
2563 );
2564 rel_count += 1;
2565 }
2566 }
2567 }
2568 Ok(EnrichItemResult::Done {
2569 memory_id: Some(mem_id),
2570 entity_id: None,
2571 entities: ent_count,
2572 rels: rel_count,
2573 chars_before: None,
2574 chars_after: None,
2575 cost,
2576 is_oauth,
2577 })
2578}
2579
2580fn call_body_extract(
2582 conn: &Connection,
2583 _namespace: &str,
2584 item_key: &str,
2585 binary: &Path,
2586 model: Option<&str>,
2587 timeout: u64,
2588 mode: &EnrichMode,
2589) -> Result<EnrichItemResult, AppError> {
2590 let (mem_id, body): (i64, String) = conn
2591 .query_row(
2592 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2593 rusqlite::params![item_key],
2594 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
2595 )
2596 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2597 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
2598 let (value, cost, is_oauth) = match mode {
2599 EnrichMode::ClaudeCode => call_claude(
2600 binary,
2601 BODY_EXTRACT_PROMPT,
2602 BODY_EXTRACT_SCHEMA,
2603 &input_text,
2604 model,
2605 timeout,
2606 )?,
2607 EnrichMode::Codex => call_codex(
2608 binary,
2609 BODY_EXTRACT_PROMPT,
2610 BODY_EXTRACT_SCHEMA,
2611 &input_text,
2612 model,
2613 timeout,
2614 )?,
2615 };
2616 let restructured = value
2617 .get("restructured_body")
2618 .and_then(|v| v.as_str())
2619 .unwrap_or(&body);
2620 let chars_before = body.len();
2621 let chars_after = restructured.len();
2622 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
2623 conn.execute(
2624 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
2625 rusqlite::params![restructured, new_hash, mem_id],
2626 )?;
2627 Ok(EnrichItemResult::Done {
2628 memory_id: Some(mem_id),
2629 entity_id: None,
2630 entities: 0,
2631 rels: 0,
2632 chars_before: Some(chars_before),
2633 chars_after: Some(chars_after),
2634 cost,
2635 is_oauth,
2636 })
2637}
2638
2639#[allow(clippy::type_complexity)]
2641fn scan_isolated_entity_pairs(
2642 conn: &Connection,
2643 namespace: &str,
2644 limit: Option<usize>,
2645) -> Result<Vec<(i64, String, i64, String)>, AppError> {
2646 let limit_val = limit.unwrap_or(50) as i64;
2647 let mut stmt = conn.prepare_cached(
2648 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
2649 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
2650 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
2651 (r.source_id = e1.id AND r.target_id = e2.id) OR \
2652 (r.source_id = e2.id AND r.target_id = e1.id)) \
2653 LIMIT ?2",
2654 )?;
2655 let rows = stmt
2656 .query_map(rusqlite::params![namespace, limit_val], |r| {
2657 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
2658 })?
2659 .collect::<Result<Vec<_>, _>>()?;
2660 Ok(rows)
2661}
2662
2663fn scan_entities_for_type_validation(
2665 conn: &Connection,
2666 namespace: &str,
2667 limit: Option<usize>,
2668) -> Result<Vec<(i64, String, String)>, AppError> {
2669 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2670 let sql = format!(
2671 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
2672 );
2673 let mut stmt = conn.prepare(&sql)?;
2674 let rows = stmt
2675 .query_map(rusqlite::params![namespace], |r| {
2676 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
2677 })?
2678 .collect::<Result<Vec<_>, _>>()?;
2679 Ok(rows)
2680}
2681
2682fn scan_generic_descriptions(
2684 conn: &Connection,
2685 namespace: &str,
2686 limit: Option<usize>,
2687) -> Result<Vec<(i64, String, String)>, AppError> {
2688 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2689 let sql = format!(
2690 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
2691 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
2692 ORDER BY id {limit_clause}"
2693 );
2694 let mut stmt = conn.prepare(&sql)?;
2695 let rows = stmt
2696 .query_map(rusqlite::params![namespace], |r| {
2697 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
2698 })?
2699 .collect::<Result<Vec<_>, _>>()?;
2700 Ok(rows)
2701}
2702
2703fn call_codex(
2707 binary: &Path,
2708 prompt: &str,
2709 json_schema: &str,
2710 input_text: &str,
2711 model: Option<&str>,
2712 timeout_secs: u64,
2713) -> Result<(serde_json::Value, f64, bool), AppError> {
2714 use wait_timeout::ChildExt;
2715
2716 let full_prompt = format!("{prompt}\n\n{input_text}");
2717 let schema_file = {
2718 let tmp = std::env::temp_dir().join(format!("enrich-schema-{}.json", std::process::id()));
2719 std::fs::write(&tmp, json_schema).map_err(AppError::Io)?;
2720 tmp
2721 };
2722
2723 let mut cmd = Command::new(binary);
2724 cmd.env_clear();
2725 for var in &[
2726 "PATH",
2727 "HOME",
2728 "USER",
2729 "OPENAI_API_KEY",
2730 "TMPDIR",
2731 "TMP",
2732 "TEMP",
2733 ] {
2734 if let Ok(val) = std::env::var(var) {
2735 cmd.env(var, val);
2736 }
2737 }
2738
2739 #[cfg(windows)]
2740 for var in &[
2741 "LOCALAPPDATA",
2742 "APPDATA",
2743 "USERPROFILE",
2744 "SystemRoot",
2745 "COMSPEC",
2746 "PATHEXT",
2747 ] {
2748 if let Ok(val) = std::env::var(var) {
2749 cmd.env(var, val);
2750 }
2751 }
2752
2753 cmd.arg("exec")
2754 .arg("--json")
2755 .arg("--output-schema")
2756 .arg(&schema_file);
2757
2758 if let Some(m) = model {
2759 cmd.arg("--model").arg(m);
2760 }
2761
2762 cmd.stdin(Stdio::piped())
2763 .stdout(Stdio::piped())
2764 .stderr(Stdio::piped());
2765
2766 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
2767 AppError::Io(std::io::Error::new(
2768 e.kind(),
2769 format!("failed to spawn codex: {e}"),
2770 ))
2771 })?;
2772
2773 let stdin_bytes = full_prompt.into_bytes();
2774 let mut child_stdin = child
2775 .stdin
2776 .take()
2777 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
2778 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
2779 child_stdin.write_all(&stdin_bytes)?;
2780 drop(child_stdin);
2781 Ok(())
2782 });
2783
2784 let start = std::time::Instant::now();
2785 let timeout = std::time::Duration::from_secs(timeout_secs);
2786 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
2787
2788 let _ = std::fs::remove_file(&schema_file);
2789
2790 match status {
2791 Some(exit_status) => {
2792 stdin_thread
2793 .join()
2794 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
2795 .map_err(AppError::Io)?;
2796
2797 tracing::debug!(
2798 target: "process",
2799 exit_code = ?exit_status.code(),
2800 elapsed_ms = start.elapsed().as_millis() as u64,
2801 "external process completed"
2802 );
2803
2804 let mut stdout_buf = Vec::new();
2805 if let Some(mut out) = child.stdout.take() {
2806 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
2807 }
2808 if !exit_status.success() {
2809 let mut stderr_buf = Vec::new();
2810 if let Some(mut err) = child.stderr.take() {
2811 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
2812 }
2813 let stderr_str = String::from_utf8_lossy(&stderr_buf);
2814 tracing::warn!(
2815 target: "enrich",
2816 exit_code = ?exit_status.code(),
2817 stderr = %stderr_str.trim(),
2818 "codex process failed"
2819 );
2820 return Err(AppError::Validation(format!(
2821 "codex exited with code {:?}: {}",
2822 exit_status.code(),
2823 stderr_str.trim()
2824 )));
2825 }
2826 let stdout_str = String::from_utf8(stdout_buf)
2827 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
2828 let value: serde_json::Value = serde_json::from_str(&stdout_str).map_err(|e| {
2829 AppError::Validation(format!("failed to parse codex output as JSON: {e}"))
2830 })?;
2831 Ok((value, 0.0, false))
2832 }
2833 None => {
2834 let _ = child.kill();
2835 let _ = child.wait();
2836 let _ = stdin_thread.join();
2837 Err(AppError::Validation(format!(
2838 "codex timed out after {timeout_secs} seconds"
2839 )))
2840 }
2841 }
2842}
2843
2844#[cfg(test)]
2849mod tests {
2850 use super::*;
2851 use rusqlite::Connection;
2852
2853 fn open_test_db() -> Connection {
2855 let conn = Connection::open_in_memory().expect("in-memory db");
2856 conn.execute_batch(
2857 "CREATE TABLE memories (
2858 id INTEGER PRIMARY KEY AUTOINCREMENT,
2859 namespace TEXT NOT NULL DEFAULT 'global',
2860 name TEXT NOT NULL,
2861 type TEXT NOT NULL DEFAULT 'note',
2862 description TEXT NOT NULL DEFAULT '',
2863 body TEXT NOT NULL DEFAULT '',
2864 body_hash TEXT NOT NULL DEFAULT '',
2865 session_id TEXT,
2866 source TEXT NOT NULL DEFAULT 'agent',
2867 metadata TEXT NOT NULL DEFAULT '{}',
2868 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
2869 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
2870 deleted_at INTEGER,
2871 UNIQUE(namespace, name)
2872 );
2873 CREATE TABLE entities (
2874 id INTEGER PRIMARY KEY AUTOINCREMENT,
2875 namespace TEXT NOT NULL DEFAULT 'global',
2876 name TEXT NOT NULL,
2877 type TEXT NOT NULL DEFAULT 'concept',
2878 description TEXT,
2879 degree INTEGER NOT NULL DEFAULT 0,
2880 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
2881 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
2882 UNIQUE(namespace, name)
2883 );
2884 CREATE TABLE memory_entities (
2885 memory_id INTEGER NOT NULL,
2886 entity_id INTEGER NOT NULL,
2887 PRIMARY KEY (memory_id, entity_id)
2888 );
2889 CREATE TABLE relationships (
2890 id INTEGER PRIMARY KEY AUTOINCREMENT,
2891 namespace TEXT NOT NULL DEFAULT 'global',
2892 source_id INTEGER NOT NULL,
2893 target_id INTEGER NOT NULL,
2894 relation TEXT NOT NULL,
2895 weight REAL NOT NULL DEFAULT 0.5,
2896 description TEXT,
2897 UNIQUE(source_id, target_id, relation)
2898 );",
2899 )
2900 .expect("schema creation must succeed");
2901 conn
2902 }
2903
2904 #[test]
2905 fn scan_unbound_memories_finds_memories_without_bindings() {
2906 let conn = open_test_db();
2907 conn.execute(
2908 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
2909 [],
2910 )
2911 .unwrap();
2912
2913 let results = scan_unbound_memories(&conn, "global", None).unwrap();
2914 assert_eq!(results.len(), 1);
2915 assert_eq!(results[0].1, "test-mem");
2916 }
2917
2918 #[test]
2919 fn scan_unbound_memories_excludes_bound_memories() {
2920 let conn = open_test_db();
2921 conn.execute(
2922 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
2923 [],
2924 )
2925 .unwrap();
2926 let mem_id: i64 = conn
2927 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
2928 r.get(0)
2929 })
2930 .unwrap();
2931 conn.execute(
2932 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
2933 [],
2934 )
2935 .unwrap();
2936 let ent_id: i64 = conn
2937 .query_row(
2938 "SELECT id FROM entities WHERE name='some-entity'",
2939 [],
2940 |r| r.get(0),
2941 )
2942 .unwrap();
2943 conn.execute(
2944 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
2945 rusqlite::params![mem_id, ent_id],
2946 )
2947 .unwrap();
2948
2949 let results = scan_unbound_memories(&conn, "global", None).unwrap();
2950 assert!(results.is_empty(), "bound memory must not appear in scan");
2951 }
2952
2953 #[test]
2954 fn scan_entities_without_description_finds_null_description() {
2955 let conn = open_test_db();
2956 conn.execute(
2957 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
2958 [],
2959 )
2960 .unwrap();
2961
2962 let results = scan_entities_without_description(&conn, "global", None).unwrap();
2963 assert_eq!(results.len(), 1);
2964 assert_eq!(results[0].1, "my-tool");
2965 }
2966
2967 #[test]
2968 fn scan_entities_without_description_excludes_entities_with_description() {
2969 let conn = open_test_db();
2970 conn.execute(
2971 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
2972 [],
2973 )
2974 .unwrap();
2975
2976 let results = scan_entities_without_description(&conn, "global", None).unwrap();
2977 assert!(
2978 results.is_empty(),
2979 "entity with description must not appear"
2980 );
2981 }
2982
2983 #[test]
2984 fn scan_short_body_memories_finds_short_bodies() {
2985 let conn = open_test_db();
2986 conn.execute(
2987 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
2988 [],
2989 )
2990 .unwrap();
2991
2992 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
2993 assert_eq!(results.len(), 1);
2994 assert_eq!(results[0].1, "short-mem");
2995 }
2996
2997 #[test]
2998 fn scan_short_body_memories_excludes_long_bodies() {
2999 let conn = open_test_db();
3000 let long_body = "a".repeat(1000);
3001 conn.execute(
3002 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3003 rusqlite::params![long_body],
3004 )
3005 .unwrap();
3006
3007 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3008 assert!(results.is_empty(), "long memory must not appear in scan");
3009 }
3010
3011 #[test]
3012 fn scan_respects_limit() {
3013 let conn = open_test_db();
3014 for i in 0..5 {
3015 conn.execute(
3016 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3017 [],
3018 )
3019 .unwrap();
3020 }
3021
3022 let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3023 assert_eq!(results.len(), 3, "limit must be respected");
3024 }
3025
3026 #[test]
3027 fn queue_db_schema_creates_correctly() {
3028 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3029 let conn = open_queue_db(&tmp_path).expect("queue db must open");
3030 let count: i64 = conn
3031 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3032 .unwrap();
3033 assert_eq!(count, 0);
3034 let _ = std::fs::remove_file(&tmp_path);
3035 }
3036
3037 #[test]
3038 fn parse_claude_output_valid_bindings() {
3039 let output = r#"[
3040 {"type":"system","subtype":"init"},
3041 {"type":"result","is_error":false,"total_cost_usd":0.01,
3042 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3043 ]"#;
3044 let result = crate::commands::claude_runner::parse_claude_output(output)
3045 .expect("must parse successfully");
3046 assert!(result.value.get("entities").is_some());
3047 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3048 assert!(!result.is_oauth);
3049 }
3050
3051 #[test]
3052 fn parse_claude_output_detects_oauth() {
3053 let output = r#"[
3054 {"type":"system","subtype":"init","apiKeySource":"none"},
3055 {"type":"result","is_error":false,"total_cost_usd":0.0,
3056 "structured_output":{"entities":[],"relationships":[]}}
3057 ]"#;
3058 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
3059 assert!(result.is_oauth);
3060 }
3061
3062 #[test]
3063 fn parse_claude_output_rate_limit_returns_error() {
3064 let output = r#"[
3065 {"type":"system","subtype":"init"},
3066 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
3067 ]"#;
3068 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3069 assert!(matches!(err, AppError::RateLimited { .. }));
3070 }
3071
3072 #[test]
3073 fn parse_claude_output_auth_error() {
3074 let output = r#"[
3075 {"type":"system","subtype":"init"},
3076 {"type":"result","is_error":true,"error":"authentication failed"}
3077 ]"#;
3078 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3079 assert!(format!("{err}").contains("authentication failed"));
3080 }
3081
3082 #[test]
3083 fn dry_run_emits_preview_without_calling_llm() {
3084 let conn = open_test_db();
3089 conn.execute(
3090 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
3091 [],
3092 )
3093 .unwrap();
3094
3095 let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
3096 assert_eq!(results.len(), 1);
3097 assert_eq!(results[0].1, "dry-mem");
3098 }
3101
3102 #[test]
3103 fn persist_entity_description_updates_db() {
3104 let conn = open_test_db();
3105 conn.execute(
3106 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
3107 [],
3108 )
3109 .unwrap();
3110 let eid: i64 = conn
3111 .query_row(
3112 "SELECT id FROM entities WHERE name='tokio-runtime'",
3113 [],
3114 |r| r.get(0),
3115 )
3116 .unwrap();
3117
3118 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
3119
3120 let desc: String = conn
3121 .query_row(
3122 "SELECT description FROM entities WHERE id=?1",
3123 rusqlite::params![eid],
3124 |r| r.get(0),
3125 )
3126 .unwrap();
3127 assert_eq!(desc, "Async runtime for Rust applications");
3128 }
3129
3130 #[test]
3131 fn bindings_schema_is_valid_json() {
3132 let _: serde_json::Value =
3133 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
3134 }
3135
3136 #[test]
3137 fn entity_description_schema_is_valid_json() {
3138 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
3139 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
3140 }
3141
3142 #[test]
3143 fn body_enrich_schema_is_valid_json() {
3144 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
3145 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
3146 }
3147}