1use crate::commands::ingest_claude::find_claude_binary;
25use crate::constants::MAX_MEMORY_BODY_LEN;
26use crate::entity_type::EntityType;
27use crate::errors::AppError;
28use crate::paths::AppPaths;
29use crate::storage::connection::{ensure_db_ready, open_rw};
30use crate::storage::entities::{self, NewEntity, NewRelationship};
31use crate::storage::memories;
32
33use rusqlite::Connection;
34use serde::{Deserialize, Serialize};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::time::Instant;
38
39const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
44const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
45const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
46const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
47
48const BINDINGS_SCHEMA: &str = r#"{
53 "type": "object",
54 "properties": {
55 "entities": {
56 "type": "array",
57 "items": {
58 "type": "object",
59 "properties": {
60 "name": { "type": "string" },
61 "entity_type": {
62 "type": "string",
63 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
64 }
65 },
66 "required": ["name", "entity_type"],
67 "additionalProperties": false
68 }
69 },
70 "relationships": {
71 "type": "array",
72 "items": {
73 "type": "object",
74 "properties": {
75 "source": { "type": "string" },
76 "target": { "type": "string" },
77 "relation": {
78 "type": "string",
79 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
80 },
81 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
82 },
83 "required": ["source","target","relation","strength"],
84 "additionalProperties": false
85 }
86 }
87 },
88 "required": ["entities","relationships"],
89 "additionalProperties": false
90}"#;
91
92const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
93 "type": "object",
94 "properties": {
95 "description": { "type": "string" }
96 },
97 "required": ["description"],
98 "additionalProperties": false
99}"#;
100
101const BODY_ENRICH_SCHEMA: &str = r#"{
102 "type": "object",
103 "properties": {
104 "enriched_body": { "type": "string" }
105 },
106 "required": ["enriched_body"],
107 "additionalProperties": false
108}"#;
109
110const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
112Scale:\n\
113- 0.9 = vital hard dependency (A cannot function without B)\n\
114- 0.7 = important design relationship (A strongly supports/enables B)\n\
115- 0.5 = useful contextual link (A and B share relevant context)\n\
116- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
117Respond with the calibrated weight and brief reasoning.";
118
119const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
120 "type": "object",
121 "properties": {
122 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
123 "reasoning": { "type": "string" }
124 },
125 "required": ["calibrated_weight", "reasoning"],
126 "additionalProperties": false
127}"#;
128
129const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
131Valid canonical relations (pick exactly one):\n\
132- depends-on: A cannot function without B\n\
133- uses: A utilizes B but could substitute it\n\
134- supports: A reinforces or enables B\n\
135- causes: A triggers or produces B\n\
136- fixes: A resolves a problem in B\n\
137- contradicts: A conflicts with or invalidates B\n\
138- applies-to: A is relevant to or scoped within B\n\
139- follows: A comes after B in sequence\n\
140- replaces: A substitutes B\n\
141- tracked-in: A is monitored in B\n\
142- related: A and B share context (use sparingly)\n\n\
143Respond with the correct relation, strength, and reasoning.";
144
145const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
146 "type": "object",
147 "properties": {
148 "relation": { "type": "string" },
149 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
150 "reasoning": { "type": "string" }
151 },
152 "required": ["relation", "strength", "reasoning"],
153 "additionalProperties": false
154}"#;
155
156const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
158Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
159If NO meaningful relationship exists, set relation to \"none\".\n\
160Respond with the relation (or \"none\"), strength, and reasoning.";
161
162const ENTITY_CONNECT_SCHEMA: &str = r#"{
163 "type": "object",
164 "properties": {
165 "relation": { "type": "string" },
166 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
167 "reasoning": { "type": "string" }
168 },
169 "required": ["relation", "strength", "reasoning"],
170 "additionalProperties": false
171}"#;
172
173const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
175Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
176If the current type is correct, keep it. If wrong, suggest the correct type.\n\
177Respond with the validated type and reasoning.";
178
179const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
180 "type": "object",
181 "properties": {
182 "validated_type": { "type": "string" },
183 "was_correct": { "type": "boolean" },
184 "reasoning": { "type": "string" }
185 },
186 "required": ["validated_type", "was_correct", "reasoning"],
187 "additionalProperties": false
188}"#;
189
190const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
192BAD: 'ingested from docs/auth.md'\n\
193GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
194Respond with the improved description and reasoning.";
195
196const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
197 "type": "object",
198 "properties": {
199 "description": { "type": "string" },
200 "reasoning": { "type": "string" }
201 },
202 "required": ["description", "reasoning"],
203 "additionalProperties": false
204}"#;
205
206const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
208Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
209
210const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
211 "type": "object",
212 "properties": {
213 "domain": { "type": "string" },
214 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
215 "reasoning": { "type": "string" }
216 },
217 "required": ["domain", "confidence", "reasoning"],
218 "additionalProperties": false
219}"#;
220
221const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
223Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
224Respond with a list of issues found (or empty if none) and an overall quality score.";
225
226const GRAPH_AUDIT_SCHEMA: &str = r#"{
227 "type": "object",
228 "properties": {
229 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
230 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
231 "reasoning": { "type": "string" }
232 },
233 "required": ["quality_score", "issues", "reasoning"],
234 "additionalProperties": false
235}"#;
236
237const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
239Entity names: lowercase kebab-case, domain-specific.\n\
240Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
241Respond with extracted entities, relationships, and a synthesis summary.";
242
243const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
244 "type": "object",
245 "properties": {
246 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
247 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
248 "summary": { "type": "string" }
249 },
250 "required": ["entities", "relationships", "summary"],
251 "additionalProperties": false
252}"#;
253
254const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
256Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
257Respond with the restructured body and a brief summary of changes.";
258
259const BODY_EXTRACT_SCHEMA: &str = r#"{
260 "type": "object",
261 "properties": {
262 "restructured_body": { "type": "string" },
263 "changes_summary": { "type": "string" }
264 },
265 "required": ["restructured_body", "changes_summary"],
266 "additionalProperties": false
267}"#;
268
269const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2741. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2752. Typed relationships between entities with strength scores\n\n\
276Rules:\n\
277- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
278- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
279- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
280- NEVER use 'mentions' as relationship type\n\
281- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
282- Prefer fewer high-quality entities over many low-quality ones";
283
284const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
285
286const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
287
288#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
294#[serde(rename_all = "kebab-case")]
295pub enum EnrichOperation {
296 MemoryBindings,
298 EntityDescriptions,
300 BodyEnrich,
302 ReEmbed,
304 WeightCalibrate,
306 RelationReclassify,
308 EntityConnect,
310 EntityTypeValidate,
312 DescriptionEnrich,
314 CrossDomainBridges,
316 DomainClassify,
318 GraphAudit,
320 DeepResearchSynth,
322 BodyExtract,
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
328pub enum EnrichMode {
329 ClaudeCode,
331 Codex,
333}
334
335impl std::fmt::Display for EnrichMode {
336 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337 match self {
338 EnrichMode::ClaudeCode => write!(f, "claude-code"),
339 EnrichMode::Codex => write!(f, "codex"),
340 }
341 }
342}
343
344#[derive(clap::Args)]
346#[command(
347 about = "Enrich graph memories and entities using an LLM provider",
348 after_long_help = "EXAMPLES:\n \
349 # Add missing entity bindings to all unbound memories\n \
350 sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n \
351 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
352 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
353 # Expand short memory bodies (GAP-18)\n \
354 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
355 # Rebuild only missing memory embeddings without rewriting bodies\n \
356 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
357 # Resume an interrupted body-enrich run\n \
358 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
359 # Retry only failed items from a previous run\n \
360 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
361 EXIT CODES:\n \
362 0 success\n \
363 1 validation error (bad args, binary not found)\n \
364 14 I/O error"
365)]
366pub struct EnrichArgs {
367 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
369 pub operation: EnrichOperation,
370
371 #[arg(long, value_enum, default_value = "claude-code")]
373 pub mode: EnrichMode,
374
375 #[arg(long, value_name = "N")]
377 pub limit: Option<usize>,
378
379 #[arg(long)]
381 pub dry_run: bool,
382
383 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
385 pub namespace: Option<String>,
386
387 #[arg(long, value_name = "PATH")]
390 pub claude_binary: Option<PathBuf>,
391
392 #[arg(long, value_name = "MODEL")]
394 pub claude_model: Option<String>,
395
396 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
398 pub claude_timeout: u64,
399
400 #[arg(long, value_name = "PATH")]
403 pub codex_binary: Option<PathBuf>,
404
405 #[arg(long, value_name = "MODEL")]
407 pub codex_model: Option<String>,
408
409 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
411 pub codex_timeout: u64,
412
413 #[arg(long, value_name = "USD")]
416 pub max_cost_usd: Option<f64>,
417
418 #[arg(long)]
421 pub resume: bool,
422
423 #[arg(long)]
425 pub retry_failed: bool,
426
427 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
430 pub min_output_chars: usize,
431
432 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
434 pub max_output_chars: usize,
435
436 #[arg(long, default_value_t = true)]
438 pub preserve_check: bool,
439
440 #[arg(long, value_name = "PATH")]
442 pub prompt_template: Option<PathBuf>,
443
444 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
448 pub llm_parallelism: u32,
449
450 #[arg(long)]
453 pub json: bool,
454
455 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
457 pub db: Option<String>,
458
459 #[arg(long, value_name = "SECONDS")]
462 pub wait_job_singleton: Option<u64>,
463
464 #[arg(long, default_value_t = false)]
468 pub force_job_singleton: bool,
469
470 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
474 pub names: Vec<String>,
475
476 #[arg(long, value_name = "PATH")]
480 pub names_file: Option<PathBuf>,
481
482 #[arg(long, default_value_t = false)]
486 pub preflight_check: bool,
487
488 #[arg(long, value_enum)]
492 pub fallback_mode: Option<EnrichMode>,
493
494 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
497 pub rate_limit_buffer: u64,
498
499 #[arg(long, default_value_t = true)]
503 pub max_load_check: bool,
504
505 #[arg(long, value_name = "N", default_value_t = 5)]
508 pub circuit_breaker_threshold: u32,
509
510 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
517 pub preserve_threshold: f64,
518
519 #[arg(long, default_value_t = true)]
524 pub codex_model_validate: bool,
525
526 #[arg(long, value_name = "MODEL")]
531 pub codex_model_fallback: Option<String>,
532}
533
534#[derive(Debug, Serialize)]
543struct PhaseEvent<'a> {
544 phase: &'a str,
545 #[serde(skip_serializing_if = "Option::is_none")]
546 binary_path: Option<&'a str>,
547 #[serde(skip_serializing_if = "Option::is_none")]
548 version: Option<&'a str>,
549 #[serde(skip_serializing_if = "Option::is_none")]
550 items_total: Option<usize>,
551 #[serde(skip_serializing_if = "Option::is_none")]
552 items_pending: Option<usize>,
553 #[serde(skip_serializing_if = "Option::is_none")]
555 llm_parallelism: Option<u32>,
556}
557
558#[derive(Debug, Serialize)]
559struct ItemEvent<'a> {
560 item: &'a str,
562 status: &'a str,
563 #[serde(skip_serializing_if = "Option::is_none")]
564 memory_id: Option<i64>,
565 #[serde(skip_serializing_if = "Option::is_none")]
566 entity_id: Option<i64>,
567 #[serde(skip_serializing_if = "Option::is_none")]
568 entities: Option<usize>,
569 #[serde(skip_serializing_if = "Option::is_none")]
570 rels: Option<usize>,
571 #[serde(skip_serializing_if = "Option::is_none")]
572 chars_before: Option<usize>,
573 #[serde(skip_serializing_if = "Option::is_none")]
574 chars_after: Option<usize>,
575 #[serde(skip_serializing_if = "Option::is_none")]
576 cost_usd: Option<f64>,
577 #[serde(skip_serializing_if = "Option::is_none")]
578 elapsed_ms: Option<u64>,
579 #[serde(skip_serializing_if = "Option::is_none")]
580 error: Option<String>,
581 index: usize,
582 total: usize,
583}
584
585#[derive(Debug, Serialize)]
586struct EnrichSummary {
587 summary: bool,
588 operation: String,
589 items_total: usize,
590 completed: usize,
591 failed: usize,
592 skipped: usize,
593 cost_usd: f64,
594 elapsed_ms: u64,
595}
596
597use crate::output::emit_json_line as emit_json;
598
599fn open_queue_db(path: &str) -> Result<Connection, AppError> {
614 let conn = Connection::open(path)?;
615 conn.pragma_update(None, "journal_mode", "wal")?;
616 conn.execute_batch(
617 "CREATE TABLE IF NOT EXISTS queue (
618 id INTEGER PRIMARY KEY AUTOINCREMENT,
619 item_key TEXT NOT NULL UNIQUE,
620 item_type TEXT NOT NULL DEFAULT 'memory',
621 status TEXT NOT NULL DEFAULT 'pending',
622 memory_id INTEGER,
623 entity_id INTEGER,
624 entities INTEGER DEFAULT 0,
625 rels INTEGER DEFAULT 0,
626 error TEXT,
627 cost_usd REAL DEFAULT 0.0,
628 attempt INTEGER DEFAULT 0,
629 elapsed_ms INTEGER,
630 created_at TEXT DEFAULT (datetime('now')),
631 done_at TEXT
632 );
633 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
634 )?;
635 Ok(conn)
636}
637
638fn call_claude(
646 binary: &Path,
647 prompt: &str,
648 json_schema: &str,
649 input_text: &str,
650 model: Option<&str>,
651 timeout_secs: u64,
652) -> Result<(serde_json::Value, f64, bool), AppError> {
653 let result = crate::commands::claude_runner::run_claude(
654 binary,
655 prompt,
656 json_schema,
657 input_text,
658 model,
659 timeout_secs,
660 7,
661 )?;
662 Ok((result.value, result.cost_usd, result.is_oauth))
663}
664
665enum PreflightOutcome {
671 Healthy,
673 RateLimited {
677 reason: String,
678 suggestion: &'static str,
679 },
680 Error(AppError),
682}
683
684fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
692 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
693
694 match args.mode {
695 EnrichMode::ClaudeCode => {
696 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
697 Ok(b) => b,
698 Err(e) => return PreflightOutcome::Error(e),
699 };
700 let mut cmd = std::process::Command::new(&bin);
701 cmd.env_clear();
702 for var in &["PATH", "HOME", "USER"] {
703 if let Ok(val) = std::env::var(var) {
704 cmd.env(var, val);
705 }
706 }
707 cmd.arg("-p")
708 .arg("ping")
709 .arg("--max-turns")
710 .arg("1")
711 .arg("--strict-mcp-config")
712 .arg("--mcp-config")
713 .arg("{}")
714 .arg("--dangerously-skip-permissions")
715 .arg("--settings")
716 .arg("{\"hooks\":{}}")
717 .arg("--output-format")
718 .arg("json")
719 .stdin(std::process::Stdio::null())
720 .stdout(std::process::Stdio::piped())
721 .stderr(std::process::Stdio::piped());
722
723 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
724 Ok(c) => c,
725 Err(e) => {
726 return PreflightOutcome::Error(AppError::Io(e));
727 }
728 };
729 let output = match wait_with_timeout(child, timeout) {
730 Ok(out) => out,
731 Err(e) => return PreflightOutcome::Error(e),
732 };
733 if !output.status.success() {
734 let stderr = String::from_utf8_lossy(&output.stderr);
735 if stderr.contains("hit your session limit")
736 || stderr.contains("rate_limit")
737 || stderr.contains("429")
738 {
739 return PreflightOutcome::RateLimited {
740 reason: stderr.trim().to_string(),
741 suggestion:
742 "wait for the OAuth window to reset or use --fallback-mode codex",
743 };
744 }
745 return PreflightOutcome::Error(AppError::Validation(format!(
746 "preflight probe failed: {stderr}",
747 stderr = stderr.trim()
748 )));
749 }
750 PreflightOutcome::Healthy
751 }
752 EnrichMode::Codex => {
753 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
754 Ok(b) => b,
755 Err(e) => return PreflightOutcome::Error(e),
756 };
757 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
758 .map_err(PreflightOutcome::Error)
759 .ok();
760 let schema = "{}";
761 let schema_path = match super::codex_spawn::trusted_schema_path() {
762 Ok(p) => p,
763 Err(e) => return PreflightOutcome::Error(e),
764 };
765 let spawn_args = super::codex_spawn::CodexSpawnArgs {
766 binary: &bin,
767 prompt: "ping",
768 json_schema: schema,
769 input_text: "",
770 model: args.codex_model.as_deref(),
771 timeout_secs: args.rate_limit_buffer.max(60),
772 schema_path: schema_path.clone(),
773 };
774 let mut cmd = super::codex_spawn::build_codex_command(&spawn_args);
775 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
776 Ok(c) => c,
777 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
778 };
779 let output = match wait_with_timeout(child, timeout) {
780 Ok(out) => out,
781 Err(e) => return PreflightOutcome::Error(e),
782 };
783 let _ = std::fs::remove_file(&schema_path);
784 if !output.status.success() {
785 let stderr = String::from_utf8_lossy(&output.stderr);
786 if stderr.contains("rate_limit")
787 || stderr.contains("429")
788 || stderr.contains("Too Many Requests")
789 {
790 return PreflightOutcome::RateLimited {
791 reason: stderr.trim().to_string(),
792 suggestion: "wait for the rate-limit window to reset",
793 };
794 }
795 return PreflightOutcome::Error(AppError::Validation(format!(
796 "preflight probe failed: {stderr}",
797 stderr = stderr.trim()
798 )));
799 }
800 PreflightOutcome::Healthy
801 }
802 }
803}
804
805fn wait_with_timeout(
807 mut child: std::process::Child,
808 timeout: std::time::Duration,
809) -> Result<std::process::Output, AppError> {
810 use wait_timeout::ChildExt;
811 let start = std::time::Instant::now();
812 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
813 if status.is_none() {
814 let _ = child.kill();
815 let _ = child.wait();
816 return Err(AppError::Validation(format!(
817 "preflight probe timed out after {}s",
818 start.elapsed().as_secs()
819 )));
820 }
821 let mut stdout = Vec::new();
822 if let Some(mut out) = child.stdout.take() {
823 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
824 }
825 let mut stderr = Vec::new();
826 if let Some(mut err) = child.stderr.take() {
827 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
828 }
829 let exit = status.unwrap();
830 Ok(std::process::Output {
831 status: exit,
832 stdout,
833 stderr,
834 })
835}
836
837fn scan_unbound_memories(
848 conn: &Connection,
849 namespace: &str,
850 limit: Option<usize>,
851 name_filter: &[String],
852) -> Result<Vec<(i64, String, String)>, AppError> {
853 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
854
855 if name_filter.is_empty() {
856 let sql = format!(
857 "SELECT m.id, m.name, m.body
858 FROM memories m
859 WHERE m.namespace = ?1
860 AND m.deleted_at IS NULL
861 AND NOT EXISTS (
862 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
863 )
864 ORDER BY m.id
865 {limit_clause}"
866 );
867 let mut stmt = conn.prepare(&sql)?;
868 let rows = stmt
869 .query_map(rusqlite::params![namespace], |r| {
870 Ok((
871 r.get::<_, i64>(0)?,
872 r.get::<_, String>(1)?,
873 r.get::<_, String>(2)?,
874 ))
875 })?
876 .collect::<Result<Vec<_>, _>>()?;
877 Ok(rows)
878 } else {
879 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
881 .map(|i| format!("?{i}"))
882 .collect();
883 let in_clause = placeholders.join(", ");
884 let sql = format!(
885 "SELECT m.id, m.name, m.body
886 FROM memories m
887 WHERE m.namespace = ?1
888 AND m.deleted_at IS NULL
889 AND m.name IN ({in_clause})
890 AND NOT EXISTS (
891 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
892 )
893 ORDER BY m.id
894 {limit_clause}"
895 );
896 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
897 params_vec.push(&namespace);
898 for n in name_filter {
899 params_vec.push(n);
900 }
901 let mut stmt = conn.prepare(&sql)?;
902 let rows = stmt
903 .query_map(
904 rusqlite::params_from_iter(params_vec.iter().copied()),
905 |r| {
906 Ok((
907 r.get::<_, i64>(0)?,
908 r.get::<_, String>(1)?,
909 r.get::<_, String>(2)?,
910 ))
911 },
912 )?
913 .collect::<Result<Vec<_>, _>>()?;
914 Ok(rows)
915 }
916}
917
918fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
923 let content = std::fs::read_to_string(path).map_err(|e| {
924 AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
925 })?;
926 let mut seen = std::collections::HashSet::new();
927 let mut out = Vec::new();
928 for line in content.lines() {
929 let trimmed = line.trim();
930 if trimmed.is_empty() || trimmed.starts_with('#') {
931 continue;
932 }
933 if seen.insert(trimmed.to_string()) {
934 out.push(trimmed.to_string());
935 }
936 }
937 Ok(out)
938}
939
940fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
942 let mut combined: Vec<String> = args.names.clone();
943 if let Some(p) = &args.names_file {
944 let from_file = read_names_file(p)?;
945 for n in from_file {
946 if !combined.contains(&n) {
947 combined.push(n);
948 }
949 }
950 }
951 Ok(combined)
952}
953
954fn scan_entities_without_description(
958 conn: &Connection,
959 namespace: &str,
960 limit: Option<usize>,
961) -> Result<Vec<(i64, String, String)>, AppError> {
962 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
963 let sql = format!(
964 "SELECT id, name, type
965 FROM entities
966 WHERE namespace = ?1
967 AND (description IS NULL OR description = '')
968 ORDER BY id
969 {limit_clause}"
970 );
971 let mut stmt = conn.prepare(&sql)?;
972 let rows = stmt
973 .query_map(rusqlite::params![namespace], |r| {
974 Ok((
975 r.get::<_, i64>(0)?,
976 r.get::<_, String>(1)?,
977 r.get::<_, String>(2)?,
978 ))
979 })?
980 .collect::<Result<Vec<_>, _>>()?;
981 Ok(rows)
982}
983
984fn scan_short_body_memories(
988 conn: &Connection,
989 namespace: &str,
990 min_chars: usize,
991 limit: Option<usize>,
992) -> Result<Vec<(i64, String, String)>, AppError> {
993 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
994 let sql = format!(
995 "SELECT m.id, m.name, m.body
996 FROM memories m
997 WHERE m.namespace = ?1
998 AND m.deleted_at IS NULL
999 AND LENGTH(COALESCE(m.body,'')) < ?2
1000 ORDER BY m.id
1001 {limit_clause}"
1002 );
1003 let mut stmt = conn.prepare(&sql)?;
1004 let rows = stmt
1005 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1006 Ok((
1007 r.get::<_, i64>(0)?,
1008 r.get::<_, String>(1)?,
1009 r.get::<_, String>(2)?,
1010 ))
1011 })?
1012 .collect::<Result<Vec<_>, _>>()?;
1013 Ok(rows)
1014}
1015
1016fn scan_memories_without_embeddings(
1020 conn: &Connection,
1021 namespace: &str,
1022 limit: Option<usize>,
1023 name_filter: &[String],
1024) -> Result<Vec<(i64, String, String)>, AppError> {
1025 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1026
1027 if name_filter.is_empty() {
1028 let sql = format!(
1029 "SELECT m.id, m.name, COALESCE(m.body,'')
1030 FROM memories m
1031 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1032 WHERE m.namespace = ?1
1033 AND m.deleted_at IS NULL
1034 AND me.memory_id IS NULL
1035 ORDER BY m.id
1036 {limit_clause}"
1037 );
1038 let mut stmt = conn.prepare(&sql)?;
1039 let rows = stmt
1040 .query_map(rusqlite::params![namespace], |r| {
1041 Ok((
1042 r.get::<_, i64>(0)?,
1043 r.get::<_, String>(1)?,
1044 r.get::<_, String>(2)?,
1045 ))
1046 })?
1047 .collect::<Result<Vec<_>, _>>()?;
1048 Ok(rows)
1049 } else {
1050 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1051 .map(|i| format!("?{i}"))
1052 .collect();
1053 let in_clause = placeholders.join(", ");
1054 let sql = format!(
1055 "SELECT m.id, m.name, COALESCE(m.body,'')
1056 FROM memories m
1057 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1058 WHERE m.namespace = ?1
1059 AND m.deleted_at IS NULL
1060 AND m.name IN ({in_clause})
1061 AND me.memory_id IS NULL
1062 ORDER BY m.id
1063 {limit_clause}"
1064 );
1065 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1066 params_vec.push(&namespace);
1067 for n in name_filter {
1068 params_vec.push(n);
1069 }
1070 let mut stmt = conn.prepare(&sql)?;
1071 let rows = stmt
1072 .query_map(
1073 rusqlite::params_from_iter(params_vec.iter().copied()),
1074 |r| {
1075 Ok((
1076 r.get::<_, i64>(0)?,
1077 r.get::<_, String>(1)?,
1078 r.get::<_, String>(2)?,
1079 ))
1080 },
1081 )?
1082 .collect::<Result<Vec<_>, _>>()?;
1083 Ok(rows)
1084 }
1085}
1086
1087#[allow(clippy::type_complexity)]
1089fn scan_weight_candidates(
1090 conn: &Connection,
1091 namespace: &str,
1092 limit: Option<usize>,
1093) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1094 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1095 let sql = format!(
1096 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1097 FROM relationships r \
1098 JOIN entities e1 ON e1.id = r.source_id \
1099 JOIN entities e2 ON e2.id = r.target_id \
1100 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1101 ORDER BY r.weight DESC {limit_clause}"
1102 );
1103 let mut stmt = conn.prepare(&sql)?;
1104 let rows = stmt
1105 .query_map(rusqlite::params![namespace], |r| {
1106 Ok((
1107 r.get::<_, i64>(0)?,
1108 r.get::<_, String>(1)?,
1109 r.get::<_, String>(2)?,
1110 r.get::<_, String>(3)?,
1111 r.get::<_, f64>(4)?,
1112 ))
1113 })?
1114 .collect::<Result<Vec<_>, _>>()?;
1115 Ok(rows)
1116}
1117
1118fn scan_generic_relations(
1120 conn: &Connection,
1121 namespace: &str,
1122 limit: Option<usize>,
1123) -> Result<Vec<(i64, String, String, String)>, AppError> {
1124 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1125 let sql = format!(
1126 "SELECT r.id, e1.name, e2.name, r.relation \
1127 FROM relationships r \
1128 JOIN entities e1 ON e1.id = r.source_id \
1129 JOIN entities e2 ON e2.id = r.target_id \
1130 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1131 ORDER BY r.id {limit_clause}"
1132 );
1133 let mut stmt = conn.prepare(&sql)?;
1134 let rows = stmt
1135 .query_map(rusqlite::params![namespace], |r| {
1136 Ok((
1137 r.get::<_, i64>(0)?,
1138 r.get::<_, String>(1)?,
1139 r.get::<_, String>(2)?,
1140 r.get::<_, String>(3)?,
1141 ))
1142 })?
1143 .collect::<Result<Vec<_>, _>>()?;
1144 Ok(rows)
1145}
1146
1147fn persist_memory_bindings(
1156 conn: &Connection,
1157 namespace: &str,
1158 memory_id: i64,
1159 entities_json: &serde_json::Value,
1160 rels_json: &serde_json::Value,
1161) -> Result<(usize, usize), AppError> {
1162 #[derive(Deserialize)]
1163 struct EntityItem {
1164 name: String,
1165 entity_type: String,
1166 }
1167 #[derive(Deserialize)]
1168 struct RelItem {
1169 source: String,
1170 target: String,
1171 relation: String,
1172 strength: f64,
1173 }
1174
1175 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1176 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1177
1178 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1179 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1180
1181 let mut ent_count = 0usize;
1182 let mut rel_count = 0usize;
1183
1184 for item in &extracted_entities {
1185 let entity_type = match item.entity_type.parse::<EntityType>() {
1186 Ok(et) => et,
1187 Err(_) => {
1188 tracing::warn!(
1189 target: "enrich",
1190 entity = %item.name,
1191 entity_type = %item.entity_type,
1192 "entity type not recognized, skipping"
1193 );
1194 continue;
1195 }
1196 };
1197 match entities::upsert_entity(
1198 conn,
1199 namespace,
1200 &NewEntity {
1201 name: item.name.clone(),
1202 entity_type,
1203 description: None,
1204 },
1205 ) {
1206 Ok(eid) => {
1207 let _ = entities::link_memory_entity(conn, memory_id, eid);
1208 ent_count += 1;
1209 }
1210 Err(e) => {
1211 tracing::warn!(
1212 target: "enrich",
1213 entity = %item.name,
1214 error = %e,
1215 "entity upsert skipped"
1216 );
1217 }
1218 }
1219 }
1220
1221 for rel in &extracted_rels {
1222 let normalized = crate::parsers::normalize_relation(&rel.relation);
1223 crate::parsers::warn_if_non_canonical(&normalized);
1224
1225 let src_name = crate::parsers::normalize_entity_name(&rel.source);
1228 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1229 let src_id = entities::find_entity_id(conn, namespace, &src_name);
1230 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1231 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1232 let new_rel = NewRelationship {
1233 source: rel.source.clone(),
1234 target: rel.target.clone(),
1235 relation: normalized,
1236 strength: rel.strength,
1237 description: None,
1238 };
1239 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1240 rel_count += 1;
1241 }
1242 }
1243 }
1244
1245 Ok((ent_count, rel_count))
1246}
1247
1248fn persist_entity_description(
1250 conn: &Connection,
1251 entity_id: i64,
1252 description: &str,
1253) -> Result<(), AppError> {
1254 conn.execute(
1255 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1256 rusqlite::params![description, entity_id],
1257 )?;
1258 Ok(())
1259}
1260
1261fn reembed_memory_vector(
1262 conn: &Connection,
1263 namespace: &str,
1264 memory_id: i64,
1265 memory_name: &str,
1266 memory_type: &str,
1267 body: &str,
1268 paths: &crate::paths::AppPaths,
1269) -> Result<(), AppError> {
1270 let snippet: String = body.chars().take(200).collect();
1271 let embedding = crate::embedder::embed_passage_local(&paths.models, body)?;
1272 memories::upsert_vec(
1273 conn,
1274 memory_id,
1275 namespace,
1276 memory_type,
1277 &embedding,
1278 memory_name,
1279 &snippet,
1280 )?;
1281 Ok(())
1282}
1283
1284fn persist_enriched_body(
1289 conn: &Connection,
1290 namespace: &str,
1291 memory_id: i64,
1292 memory_name: &str,
1293 new_body: &str,
1294 paths: &crate::paths::AppPaths,
1295) -> Result<(), AppError> {
1296 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1298 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1299 rusqlite::params![memory_id],
1300 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1301 )?;
1302
1303 let memory_type: String = conn.query_row(
1304 "SELECT type FROM memories WHERE id=?1",
1305 rusqlite::params![memory_id],
1306 |r| r.get(0),
1307 )?;
1308
1309 let description: String = conn.query_row(
1310 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1311 rusqlite::params![memory_id],
1312 |r| r.get(0),
1313 )?;
1314
1315 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1316
1317 let new_memory = memories::NewMemory {
1318 namespace: namespace.to_string(),
1319 name: memory_name.to_string(),
1320 memory_type: memory_type.clone(),
1321 description: description.clone(),
1322 body: new_body.to_string(),
1323 body_hash,
1324 session_id: None,
1325 source: "agent".to_string(),
1326 metadata: serde_json::json!({
1327 "operation": "body-enrich",
1328 "orig_chars": old_body.chars().count(),
1329 "new_chars": new_body.chars().count(),
1330 }),
1331 };
1332
1333 let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1337 let version_metadata = serde_json::json!({
1338 "operation": "body-enrich",
1339 "orig_chars": old_body.chars().count(),
1340 "new_chars": new_body.chars().count(),
1341 })
1342 .to_string();
1343 crate::storage::versions::insert_version(
1344 conn,
1345 memory_id,
1346 next_version,
1347 memory_name,
1348 &memory_type,
1349 &description,
1350 new_body,
1351 &version_metadata,
1352 Some("enrich"),
1353 "edit",
1354 )?;
1355
1356 memories::update(conn, memory_id, &new_memory, None)?;
1357 memories::sync_fts_after_update(
1358 conn,
1359 memory_id,
1360 &old_name,
1361 &old_desc,
1362 &old_body,
1363 &new_memory.name,
1364 &new_memory.description,
1365 &new_memory.body,
1366 )?;
1367
1368 if let Err(e) = reembed_memory_vector(
1370 conn,
1371 namespace,
1372 memory_id,
1373 memory_name,
1374 &memory_type,
1375 new_body,
1376 paths,
1377 ) {
1378 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1379 }
1380
1381 Ok(())
1382}
1383
1384fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1396 value == default
1397}
1398
1399fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1414 const DEFAULT_TIMEOUT: u64 = 300;
1415
1416 let mut conflicts: Vec<String> = Vec::new();
1417
1418 match args.mode {
1419 EnrichMode::ClaudeCode => {
1420 if args.codex_binary.is_some() {
1421 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1422 }
1423 if args.codex_model.is_some() {
1424 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1425 }
1426 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1427 conflicts.push(format!(
1428 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1429 args.codex_timeout
1430 ));
1431 }
1432 }
1433 EnrichMode::Codex => {
1434 if args.claude_binary.is_some() {
1435 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1436 }
1437 if args.claude_model.is_some() {
1438 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1439 }
1440 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1441 conflicts.push(format!(
1442 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1443 args.claude_timeout
1444 ));
1445 }
1446 if args.max_cost_usd.is_some() {
1447 conflicts.push(
1448 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1449 .to_string(),
1450 );
1451 }
1452 }
1453 }
1454
1455 if !conflicts.is_empty() {
1456 return Err(AppError::Validation(format!(
1457 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1458 args.mode,
1459 conflicts.join("\n - ")
1460 )));
1461 }
1462
1463 Ok(())
1464}
1465
1466pub fn run(args: &EnrichArgs) -> Result<(), AppError> {
1470 validate_mode_conditional_flags_enrich(args)?;
1473 let started = Instant::now();
1474
1475 let paths = AppPaths::resolve(args.db.as_deref())?;
1476 ensure_db_ready(&paths)?;
1477 let conn = open_rw(&paths.db)?;
1478 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1479
1480 let wait_secs = args.wait_job_singleton;
1486 let force_flag = args.force_job_singleton;
1487 let _singleton = crate::lock::acquire_job_singleton(
1488 crate::lock::JobType::Enrich,
1489 &namespace,
1490 &paths.db,
1491 wait_secs,
1492 force_flag,
1493 )?;
1494
1495 let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1497 None
1498 } else {
1499 Some(match args.mode {
1500 EnrichMode::ClaudeCode => {
1501 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1502 let version = super::claude_runner::validate_claude_version(&bin)?;
1503 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1504 emit_json(&PhaseEvent {
1505 phase: "validate",
1506 binary_path: bin.to_str(),
1507 version: Some(&version),
1508 items_total: None,
1509 items_pending: None,
1510 llm_parallelism: None,
1511 });
1512 bin
1513 }
1514 EnrichMode::Codex => {
1515 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1516 emit_json(&PhaseEvent {
1517 phase: "validate",
1518 binary_path: bin.to_str(),
1519 version: None,
1520 items_total: None,
1521 items_pending: None,
1522 llm_parallelism: None,
1523 });
1524 bin
1525 }
1526 })
1527 };
1528
1529 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1533 let load = crate::system_load::load_average_one();
1534 let n = crate::system_load::ncpus();
1535 return Err(AppError::Validation(format!(
1536 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1537 pass --no-max-load-check to override (not recommended)"
1538 )));
1539 }
1540
1541 if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1548 {
1549 let preflight_result = run_preflight_probe(args);
1550 match preflight_result {
1551 PreflightOutcome::Healthy => {
1552 tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1553 }
1554 PreflightOutcome::RateLimited { reason, suggestion } => {
1555 if let Some(fallback) = args.fallback_mode.clone() {
1556 if fallback != args.mode {
1557 return Err(AppError::Validation(format!(
1567 "preflight detected rate limit on {mode:?}: {reason}; \
1568 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1569 mode = args.mode
1570 )));
1571 }
1572 return Err(AppError::Validation(format!(
1573 "preflight detected rate limit on {mode:?}: {reason}; \
1574 --fallback-mode matches --mode, no recovery possible",
1575 mode = args.mode
1576 )));
1577 }
1578 return Err(AppError::Validation(format!(
1579 "preflight detected rate limit on {mode:?}: {reason}; \
1580 {suggestion}; pass --fallback-mode codex to recover",
1581 mode = args.mode
1582 )));
1583 }
1584 PreflightOutcome::Error(e) => {
1585 return Err(e);
1586 }
1587 }
1588 }
1589
1590 let scan_result = scan_operation(&conn, &namespace, args)?;
1592 let total = scan_result.len();
1593
1594 emit_json(&PhaseEvent {
1595 phase: "scan",
1596 binary_path: None,
1597 version: None,
1598 items_total: Some(total),
1599 items_pending: Some(total),
1600 llm_parallelism: Some(args.llm_parallelism),
1601 });
1602
1603 if args.dry_run {
1605 for (idx, key) in scan_result.iter().enumerate() {
1606 emit_json(&ItemEvent {
1607 item: key,
1608 status: "preview",
1609 memory_id: None,
1610 entity_id: None,
1611 entities: None,
1612 rels: None,
1613 chars_before: None,
1614 chars_after: None,
1615 cost_usd: None,
1616 elapsed_ms: None,
1617 error: None,
1618 index: idx,
1619 total,
1620 });
1621 }
1622 emit_json(&EnrichSummary {
1623 summary: true,
1624 operation: format!("{:?}", args.operation),
1625 items_total: total,
1626 completed: 0,
1627 failed: 0,
1628 skipped: 0,
1629 cost_usd: 0.0,
1630 elapsed_ms: started.elapsed().as_millis() as u64,
1631 });
1632 return Ok(());
1633 }
1634
1635 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1639
1640 if args.resume {
1641 let reset = queue_conn
1642 .execute(
1643 "UPDATE queue SET status='pending' WHERE status='processing'",
1644 [],
1645 )
1646 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1647 if reset > 0 {
1648 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1649 }
1650 }
1651
1652 if args.retry_failed {
1653 let count = queue_conn
1654 .execute(
1655 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1656 [],
1657 )
1658 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1659 tracing::info!(target: "enrich", count, "retrying failed items");
1660 }
1661
1662 if !args.resume && !args.retry_failed {
1663 queue_conn
1664 .execute("DELETE FROM queue", [])
1665 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1666 }
1667
1668 for (idx, key) in scan_result.iter().enumerate() {
1670 let item_type = match args.operation {
1671 EnrichOperation::EntityDescriptions => "entity",
1672 _ => "memory",
1673 };
1674 if let Err(e) = queue_conn.execute(
1675 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1676 rusqlite::params![key, item_type],
1677 ) {
1678 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1679 }
1680 let _ = idx; }
1682
1683 let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1686 if parallelism > 1 {
1687 tracing::info!(
1688 target: "enrich",
1689 llm_parallelism = parallelism,
1690 "parallel LLM processing with bounded thread pool"
1691 );
1692 }
1693 if parallelism > 4 {
1697 match args.mode {
1698 EnrichMode::ClaudeCode => {
1699 tracing::warn!(
1700 target: "enrich",
1701 llm_parallelism = parallelism,
1702 recommended_max = 4,
1703 mode = "claude-code",
1704 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1705 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1706 to cut MCP children (G28-A)"
1707 );
1708 }
1709 EnrichMode::Codex if parallelism > 16 => {
1710 tracing::warn!(
1711 target: "enrich",
1712 llm_parallelism = parallelism,
1713 recommended_max = 16,
1714 mode = "codex",
1715 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1716 consider --llm-parallelism 8 for safer concurrency"
1717 );
1718 }
1719 EnrichMode::Codex => {
1720 }
1724 }
1725 }
1726
1727 let mut completed = 0usize;
1728 let mut failed = 0usize;
1729 let mut skipped = 0usize;
1730 let mut cost_total = 0.0f64;
1731 let mut oauth_detected = false;
1732 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1733 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1734 let enrich_started = std::time::Instant::now();
1735
1736 let provider_timeout = match args.mode {
1737 EnrichMode::ClaudeCode => args.claude_timeout,
1738 EnrichMode::Codex => args.codex_timeout,
1739 };
1740
1741 let provider_model: Option<&str> = match args.mode {
1742 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1743 EnrichMode::Codex => args.codex_model.as_deref(),
1744 };
1745
1746 if parallelism > 1 {
1750 let stdout_mu = parking_lot::Mutex::new(());
1751 let budget = args.max_cost_usd;
1752 let operation = args.operation.clone();
1753 let mode = args.mode.clone();
1754 let min_oc = args.min_output_chars;
1755 let max_oc = args.max_output_chars;
1756 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1757
1758 struct WorkerResult {
1759 completed: usize,
1760 failed: usize,
1761 skipped: usize,
1762 cost: f64,
1763 oauth: bool,
1764 }
1765
1766 let results: Vec<WorkerResult> = std::thread::scope(|s| {
1767 let handles: Vec<_> = (0..parallelism)
1768 .map(|worker_id| {
1769 let stdout_mu = &stdout_mu;
1770 let paths = &paths;
1771 let namespace = &namespace;
1772 let provider_binary = provider_binary.as_deref();
1773 let operation = &operation;
1774 let mode = &mode;
1775 let prompt_tpl = prompt_tpl.as_deref();
1776 s.spawn(move || {
1777 let w_conn = match open_rw(&paths.db) {
1778 Ok(c) => c,
1779 Err(e) => {
1780 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1781 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1782 }
1783 };
1784 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1785 Ok(c) => c,
1786 Err(e) => {
1787 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1788 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1789 }
1790 };
1791 let mut w_completed = 0usize;
1792 let mut w_failed = 0usize;
1793 let mut w_skipped = 0usize;
1794 let mut w_cost = 0.0f64;
1795 let mut w_oauth = false;
1796 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1797 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1798 let mut w_breaker = crate::retry::CircuitBreaker::new(
1804 args.circuit_breaker_threshold.max(1),
1805 std::time::Duration::from_secs(60),
1806 );
1807
1808 loop {
1809 if crate::shutdown_requested() {
1810 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1811 break;
1812 }
1813 if let Some(b) = budget {
1814 if !w_oauth && w_cost >= b {
1815 break;
1816 }
1817 }
1818 let pending: Option<(i64, String, String)> = w_queue
1819 .query_row(
1820 "UPDATE queue SET status='processing', attempt=attempt+1 \
1821 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1822 RETURNING id, item_key, item_type",
1823 [],
1824 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1825 )
1826 .ok();
1827 let (queue_id, item_key, _item_type) = match pending {
1828 Some(p) => p,
1829 None => break,
1830 };
1831 let item_started = Instant::now();
1832 let current_index = w_completed + w_failed + w_skipped;
1833
1834 let call_result = match operation {
1835 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1836 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1837 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths),
1838 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths),
1839 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1840 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1841 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1842 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1843 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1844 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1845 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1846 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1847 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1848 };
1849
1850 match call_result {
1851 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1852 if is_oauth { w_oauth = true; }
1853 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1854 let _ = w_queue.execute(
1855 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1856 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1857 );
1858 w_completed += 1;
1859 if !is_oauth { w_cost += cost; }
1860 let _ = w_breaker
1862 .record(crate::retry::AttemptOutcome::Success);
1863 let _guard = stdout_mu.lock();
1864 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1865 }
1866 Ok(EnrichItemResult::Skipped { reason }) => {
1867 w_skipped += 1;
1868 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1869 let _guard = stdout_mu.lock();
1870 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1871 }
1872 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
1873 w_skipped += 1;
1879 let reason = format!(
1880 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
1881 );
1882 let _ = w_queue.execute(
1883 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1884 rusqlite::params![reason, queue_id],
1885 );
1886 let _guard = stdout_mu.lock();
1887 emit_json(&ItemEvent {
1888 item: &item_key,
1889 status: "preservation_failed",
1890 memory_id: None,
1891 entity_id: None,
1892 entities: None,
1893 rels: None,
1894 chars_before: Some(chars_before),
1895 chars_after: Some(chars_after),
1896 cost_usd: None,
1897 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1898 error: Some(reason),
1899 index: current_index,
1900 total,
1901 });
1902 }
1903 Err(e) => {
1904 let err_str = format!("{e}");
1905 if matches!(e, AppError::RateLimited { .. }) {
1906 if crate::retry::is_kill_switch_active() {
1907 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1908 } else if std::time::Instant::now() >= w_deadline {
1909 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1910 } else {
1911 let half = w_backoff / 2;
1912 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1913 let actual_wait = half + jitter;
1914 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1915 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1916 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1917 w_backoff = (w_backoff * 2).min(900);
1918 continue;
1919 }
1920 }
1921 w_failed += 1;
1922 let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1923 let _guard = stdout_mu.lock();
1924 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1925 let breaker_opened = w_breaker
1927 .record(crate::retry::AttemptOutcome::HardFailure);
1928 if breaker_opened {
1929 tracing::error!(target: "enrich",
1930 consecutive_failures = w_breaker.consecutive_failures(),
1931 "circuit breaker opened — aborting worker"
1932 );
1933 break;
1934 }
1935 }
1936 }
1937 }
1938 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1939 })
1940 })
1941 .collect();
1942 handles
1943 .into_iter()
1944 .map(|h| {
1945 h.join().unwrap_or(WorkerResult {
1946 completed: 0,
1947 failed: 0,
1948 skipped: 0,
1949 cost: 0.0,
1950 oauth: false,
1951 })
1952 })
1953 .collect()
1954 });
1955
1956 for r in &results {
1957 completed += r.completed;
1958 failed += r.failed;
1959 skipped += r.skipped;
1960 cost_total += r.cost;
1961 oauth_detected |= r.oauth;
1962 }
1963 } else {
1964 loop {
1966 if crate::shutdown_requested() {
1967 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
1968 break;
1969 }
1970
1971 if let Some(budget) = args.max_cost_usd {
1973 if !oauth_detected && cost_total >= budget {
1974 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
1975 break;
1976 }
1977 }
1978
1979 let pending: Option<(i64, String, String)> = queue_conn
1981 .query_row(
1982 "UPDATE queue SET status='processing', attempt=attempt+1 \
1983 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1984 RETURNING id, item_key, item_type",
1985 [],
1986 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1987 )
1988 .ok();
1989
1990 let (queue_id, item_key, item_type) = match pending {
1991 Some(p) => p,
1992 None => break,
1993 };
1994
1995 let item_started = Instant::now();
1996 let current_index = completed + failed + skipped;
1997
1998 let call_result = match args.operation {
1999 EnrichOperation::MemoryBindings => call_memory_bindings(
2000 &conn,
2001 &namespace,
2002 &item_key,
2003 provider_binary
2004 .as_deref()
2005 .expect("provider binary required"),
2006 provider_model,
2007 provider_timeout,
2008 &args.mode,
2009 ),
2010 EnrichOperation::EntityDescriptions => call_entity_description(
2011 &conn,
2012 &namespace,
2013 &item_key,
2014 provider_binary
2015 .as_deref()
2016 .expect("provider binary required"),
2017 provider_model,
2018 provider_timeout,
2019 &args.mode,
2020 ),
2021 EnrichOperation::BodyEnrich => call_body_enrich(
2022 &conn,
2023 &namespace,
2024 &item_key,
2025 provider_binary
2026 .as_deref()
2027 .expect("provider binary required"),
2028 provider_model,
2029 provider_timeout,
2030 &args.mode,
2031 args.min_output_chars,
2032 args.max_output_chars,
2033 args.prompt_template.as_deref(),
2034 args.preserve_threshold,
2035 &paths,
2036 ),
2037 EnrichOperation::ReEmbed => call_reembed(&conn, &namespace, &item_key, &paths),
2038 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2039 &conn,
2040 &namespace,
2041 &item_key,
2042 provider_binary
2043 .as_deref()
2044 .expect("provider binary required"),
2045 provider_model,
2046 provider_timeout,
2047 &args.mode,
2048 ),
2049 EnrichOperation::RelationReclassify => call_relation_reclassify(
2050 &conn,
2051 &namespace,
2052 &item_key,
2053 provider_binary
2054 .as_deref()
2055 .expect("provider binary required"),
2056 provider_model,
2057 provider_timeout,
2058 &args.mode,
2059 ),
2060 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2061 call_entity_connect(
2062 &conn,
2063 &namespace,
2064 &item_key,
2065 provider_binary
2066 .as_deref()
2067 .expect("provider binary required"),
2068 provider_model,
2069 provider_timeout,
2070 &args.mode,
2071 )
2072 }
2073 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2074 &conn,
2075 &namespace,
2076 &item_key,
2077 provider_binary
2078 .as_deref()
2079 .expect("provider binary required"),
2080 provider_model,
2081 provider_timeout,
2082 &args.mode,
2083 ),
2084 EnrichOperation::DescriptionEnrich => call_description_enrich(
2085 &conn,
2086 &namespace,
2087 &item_key,
2088 provider_binary
2089 .as_deref()
2090 .expect("provider binary required"),
2091 provider_model,
2092 provider_timeout,
2093 &args.mode,
2094 ),
2095 EnrichOperation::DomainClassify => call_domain_classify(
2096 &conn,
2097 &namespace,
2098 &item_key,
2099 provider_binary
2100 .as_deref()
2101 .expect("provider binary required"),
2102 provider_model,
2103 provider_timeout,
2104 &args.mode,
2105 ),
2106 EnrichOperation::GraphAudit => call_graph_audit(
2107 &conn,
2108 &namespace,
2109 &item_key,
2110 provider_binary
2111 .as_deref()
2112 .expect("provider binary required"),
2113 provider_model,
2114 provider_timeout,
2115 &args.mode,
2116 ),
2117 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2118 &conn,
2119 &namespace,
2120 &item_key,
2121 provider_binary
2122 .as_deref()
2123 .expect("provider binary required"),
2124 provider_model,
2125 provider_timeout,
2126 &args.mode,
2127 ),
2128 EnrichOperation::BodyExtract => call_body_extract(
2129 &conn,
2130 &namespace,
2131 &item_key,
2132 provider_binary
2133 .as_deref()
2134 .expect("provider binary required"),
2135 provider_model,
2136 provider_timeout,
2137 &args.mode,
2138 ),
2139 };
2140
2141 match call_result {
2142 Ok(EnrichItemResult::Done {
2143 memory_id,
2144 entity_id,
2145 entities,
2146 rels,
2147 chars_before,
2148 chars_after,
2149 cost,
2150 is_oauth,
2151 }) => {
2152 if is_oauth && !oauth_detected {
2153 oauth_detected = true;
2154 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2155 }
2156 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2157
2158 let persist_err: Option<String> = match args.operation {
2160 EnrichOperation::MemoryBindings => {
2161 None
2163 }
2164 EnrichOperation::EntityDescriptions => {
2165 None
2167 }
2168 EnrichOperation::BodyEnrich => {
2169 None
2171 }
2172 _ => {
2173 None
2175 }
2176 };
2177
2178 if let Err(e) = queue_conn.execute(
2179 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2180 rusqlite::params![
2181 memory_id,
2182 entity_id,
2183 entities as i64,
2184 rels as i64,
2185 cost,
2186 item_started.elapsed().as_millis() as i64,
2187 queue_id
2188 ],
2189 ) {
2190 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2191 }
2192
2193 if persist_err.is_none() {
2194 completed += 1;
2195 if !is_oauth {
2196 cost_total += cost;
2197 }
2198 emit_json(&ItemEvent {
2199 item: &item_key,
2200 status: "done",
2201 memory_id,
2202 entity_id,
2203 entities: Some(entities),
2204 rels: Some(rels),
2205 chars_before,
2206 chars_after,
2207 cost_usd: if is_oauth { None } else { Some(cost) },
2208 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2209 error: None,
2210 index: current_index,
2211 total,
2212 });
2213 } else {
2214 failed += 1;
2215 emit_json(&ItemEvent {
2216 item: &item_key,
2217 status: "failed",
2218 memory_id: None,
2219 entity_id: None,
2220 entities: None,
2221 rels: None,
2222 chars_before: None,
2223 chars_after: None,
2224 cost_usd: None,
2225 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2226 error: persist_err,
2227 index: current_index,
2228 total,
2229 });
2230 }
2231 }
2232 Ok(EnrichItemResult::Skipped { reason }) => {
2233 skipped += 1;
2234 if let Err(e) = queue_conn.execute(
2235 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2236 rusqlite::params![reason, queue_id],
2237 ) {
2238 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2239 }
2240 emit_json(&ItemEvent {
2241 item: &item_key,
2242 status: "skipped",
2243 memory_id: None,
2244 entity_id: None,
2245 entities: None,
2246 rels: None,
2247 chars_before: None,
2248 chars_after: None,
2249 cost_usd: None,
2250 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2251 error: None,
2252 index: current_index,
2253 total,
2254 });
2255 }
2256 Ok(EnrichItemResult::PreservationFailed {
2257 score,
2258 threshold,
2259 chars_before,
2260 chars_after,
2261 }) => {
2262 skipped += 1;
2269 let reason = format!(
2270 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2271 );
2272 if let Err(qe) = queue_conn.execute(
2273 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2274 rusqlite::params![reason, queue_id],
2275 ) {
2276 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2277 }
2278 emit_json(&ItemEvent {
2279 item: &item_key,
2280 status: "preservation_failed",
2281 memory_id: None,
2282 entity_id: None,
2283 entities: None,
2284 rels: None,
2285 chars_before: Some(chars_before),
2286 chars_after: Some(chars_after),
2287 cost_usd: None,
2288 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2289 error: Some(reason),
2290 index: current_index,
2291 total,
2292 });
2293 }
2294 Err(e) => {
2295 let err_str = format!("{e}");
2296 if matches!(e, AppError::RateLimited { .. }) {
2297 if crate::retry::is_kill_switch_active() {
2298 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2299 } else if std::time::Instant::now() >= rate_limit_deadline {
2300 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2301 } else {
2302 let half = backoff_secs / 2;
2303 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2304 let actual_wait = half + jitter;
2305 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2306 if let Err(qe) = queue_conn.execute(
2307 "UPDATE queue SET status='pending' WHERE id=?1",
2308 rusqlite::params![queue_id],
2309 ) {
2310 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2311 }
2312 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2313 backoff_secs = (backoff_secs * 2).min(900);
2314 continue;
2315 }
2316 }
2317
2318 failed += 1;
2319 if let Err(qe) = queue_conn.execute(
2320 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2321 rusqlite::params![err_str, queue_id],
2322 ) {
2323 tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2324 }
2325 emit_json(&ItemEvent {
2326 item: &item_key,
2327 status: "failed",
2328 memory_id: None,
2329 entity_id: None,
2330 entities: None,
2331 rels: None,
2332 chars_before: None,
2333 chars_after: None,
2334 cost_usd: None,
2335 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2336 error: Some(err_str),
2337 index: current_index,
2338 total,
2339 });
2340 }
2341 }
2342
2343 let _ = item_type; }
2345 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2348 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2349
2350 emit_json(&EnrichSummary {
2351 summary: true,
2352 operation: format!("{:?}", args.operation),
2353 items_total: total,
2354 completed,
2355 failed,
2356 skipped,
2357 cost_usd: cost_total,
2358 elapsed_ms: started.elapsed().as_millis() as u64,
2359 });
2360
2361 if failed == 0 {
2362 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2363 }
2364
2365 Ok(())
2366}
2367
2368enum EnrichItemResult {
2373 Done {
2374 memory_id: Option<i64>,
2375 entity_id: Option<i64>,
2376 entities: usize,
2377 rels: usize,
2378 chars_before: Option<usize>,
2379 chars_after: Option<usize>,
2380 cost: f64,
2381 is_oauth: bool,
2382 },
2383 Skipped {
2384 reason: String,
2385 },
2386 PreservationFailed {
2391 score: f64,
2392 threshold: f64,
2393 chars_before: usize,
2394 chars_after: usize,
2395 },
2396}
2397
2398fn call_memory_bindings(
2403 conn: &Connection,
2404 namespace: &str,
2405 memory_name: &str,
2406 binary: &Path,
2407 model: Option<&str>,
2408 timeout: u64,
2409 mode: &EnrichMode,
2410) -> Result<EnrichItemResult, AppError> {
2411 let (memory_id, body): (i64, String) = conn.query_row(
2413 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2414 rusqlite::params![namespace, memory_name],
2415 |r| Ok((r.get(0)?, r.get(1)?)),
2416 ).map_err(|e| match e {
2417 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2418 other => AppError::Database(other),
2419 })?;
2420
2421 if body.trim().is_empty() {
2422 return Ok(EnrichItemResult::Skipped {
2423 reason: "body is empty".to_string(),
2424 });
2425 }
2426
2427 let (value, cost, is_oauth) = match mode {
2428 EnrichMode::ClaudeCode => call_claude(
2429 binary,
2430 BINDINGS_PROMPT,
2431 BINDINGS_SCHEMA,
2432 &body,
2433 model,
2434 timeout,
2435 )?,
2436 EnrichMode::Codex => call_codex(
2437 binary,
2438 BINDINGS_PROMPT,
2439 BINDINGS_SCHEMA,
2440 &body,
2441 model,
2442 timeout,
2443 )?,
2444 };
2445
2446 let empty_arr = serde_json::Value::Array(vec![]);
2447 let entities_val = value.get("entities").unwrap_or(&empty_arr);
2448 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2449
2450 let (ent_count, rel_count) =
2451 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2452
2453 Ok(EnrichItemResult::Done {
2454 memory_id: Some(memory_id),
2455 entity_id: None,
2456 entities: ent_count,
2457 rels: rel_count,
2458 chars_before: None,
2459 chars_after: None,
2460 cost,
2461 is_oauth,
2462 })
2463}
2464
2465fn call_entity_description(
2466 conn: &Connection,
2467 namespace: &str,
2468 entity_name: &str,
2469 binary: &Path,
2470 model: Option<&str>,
2471 timeout: u64,
2472 mode: &EnrichMode,
2473) -> Result<EnrichItemResult, AppError> {
2474 let (entity_id, entity_type): (i64, String) = conn
2475 .query_row(
2476 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2477 rusqlite::params![namespace, entity_name],
2478 |r| Ok((r.get(0)?, r.get(1)?)),
2479 )
2480 .map_err(|e| match e {
2481 rusqlite::Error::QueryReturnedNoRows => {
2482 AppError::NotFound(format!("entity '{entity_name}' not found"))
2483 }
2484 other => AppError::Database(other),
2485 })?;
2486
2487 let prompt = format!(
2488 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2489 );
2490
2491 let (value, cost, is_oauth) = match mode {
2492 EnrichMode::ClaudeCode => call_claude(
2493 binary,
2494 &prompt,
2495 ENTITY_DESCRIPTION_SCHEMA,
2496 "",
2497 model,
2498 timeout,
2499 )?,
2500 EnrichMode::Codex => call_codex(
2501 binary,
2502 &prompt,
2503 ENTITY_DESCRIPTION_SCHEMA,
2504 "",
2505 model,
2506 timeout,
2507 )?,
2508 };
2509
2510 let description = value
2511 .get("description")
2512 .and_then(|v| v.as_str())
2513 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2514
2515 persist_entity_description(conn, entity_id, description)?;
2516
2517 Ok(EnrichItemResult::Done {
2518 memory_id: None,
2519 entity_id: Some(entity_id),
2520 entities: 0,
2521 rels: 0,
2522 chars_before: None,
2523 chars_after: None,
2524 cost,
2525 is_oauth,
2526 })
2527}
2528
2529#[allow(clippy::too_many_arguments)]
2530fn call_body_enrich(
2531 conn: &Connection,
2532 namespace: &str,
2533 memory_name: &str,
2534 binary: &Path,
2535 model: Option<&str>,
2536 timeout: u64,
2537 mode: &EnrichMode,
2538 min_output_chars: usize,
2539 max_output_chars: usize,
2540 prompt_template: Option<&Path>,
2541 preserve_threshold: f64,
2542 paths: &crate::paths::AppPaths,
2543) -> Result<EnrichItemResult, AppError> {
2544 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2545 .query_row(
2546 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2547 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2548 rusqlite::params![namespace, memory_name],
2549 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2550 )
2551 .map_err(|e| match e {
2552 rusqlite::Error::QueryReturnedNoRows => {
2553 AppError::NotFound(format!("memory '{memory_name}' not found"))
2554 }
2555 other => AppError::Database(other),
2556 })?;
2557
2558 let chars_before = body.chars().count();
2559
2560 let linked_entities: Vec<String> = {
2562 let mut stmt = conn.prepare_cached(
2563 "SELECT e.name FROM memory_entities me \
2564 JOIN entities e ON e.id = me.entity_id \
2565 WHERE me.memory_id = ?1 LIMIT 10",
2566 )?;
2567 let result: Vec<String> = stmt
2568 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2569 .filter_map(|r| r.ok())
2570 .collect();
2571 drop(stmt);
2572 result
2573 };
2574
2575 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2577 let file_size = std::fs::metadata(tmpl_path)
2578 .map_err(|e| {
2579 AppError::Io(std::io::Error::new(
2580 e.kind(),
2581 format!("failed to stat prompt template: {e}"),
2582 ))
2583 })?
2584 .len();
2585 if file_size > MAX_MEMORY_BODY_LEN as u64 {
2586 return Err(AppError::LimitExceeded(
2587 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2588 ));
2589 }
2590 std::fs::read_to_string(tmpl_path).map_err(|e| {
2591 AppError::Io(std::io::Error::new(
2592 e.kind(),
2593 format!("failed to read prompt template: {e}"),
2594 ))
2595 })?
2596 } else {
2597 BODY_ENRICH_PROMPT_PREFIX.to_string()
2598 };
2599
2600 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2602 let mut ctx = String::new();
2603 ctx.push_str(&format!(
2604 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2605 ));
2606 if !description.is_empty() {
2607 ctx.push_str(&format!("- Description: {description}\n"));
2608 }
2609 ctx.push_str(&format!("- Domain: {namespace}\n"));
2610 if !linked_entities.is_empty() {
2611 ctx.push_str(&format!(
2612 "- Linked entities: {}\n",
2613 linked_entities.join(", ")
2614 ));
2615 }
2616 ctx
2617 } else {
2618 String::new()
2619 };
2620
2621 let prompt = format!(
2622 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2623 );
2624
2625 let (value, cost, is_oauth) = match mode {
2627 EnrichMode::ClaudeCode => {
2628 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2629 }
2630 EnrichMode::Codex => {
2631 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2632 }
2633 };
2634
2635 let enriched_body = value
2636 .get("enriched_body")
2637 .and_then(|v| v.as_str())
2638 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2639
2640 let chars_after = enriched_body.chars().count();
2641
2642 let threshold = preserve_threshold;
2649 let verdict =
2650 crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2651 if !verdict.is_accepted() {
2652 return Ok(EnrichItemResult::PreservationFailed {
2653 score: match verdict {
2654 crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2655 crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2656 crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2657 },
2658 threshold,
2659 chars_before,
2660 chars_after,
2661 });
2662 }
2663
2664 let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2670 let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2671 if old_hash == new_hash {
2672 return Ok(EnrichItemResult::Skipped {
2673 reason: format!(
2674 "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2675 ),
2676 });
2677 }
2678
2679 if chars_after <= chars_before {
2681 return Ok(EnrichItemResult::Skipped {
2682 reason: format!(
2683 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2684 ),
2685 });
2686 }
2687
2688 persist_enriched_body(
2689 conn,
2690 namespace,
2691 memory_id,
2692 memory_name,
2693 enriched_body,
2694 paths,
2695 )?;
2696
2697 Ok(EnrichItemResult::Done {
2698 memory_id: Some(memory_id),
2699 entity_id: None,
2700 entities: 0,
2701 rels: 0,
2702 chars_before: Some(chars_before),
2703 chars_after: Some(chars_after),
2704 cost,
2705 is_oauth,
2706 })
2707}
2708
2709fn call_reembed(
2710 conn: &Connection,
2711 namespace: &str,
2712 memory_name: &str,
2713 paths: &crate::paths::AppPaths,
2714) -> Result<EnrichItemResult, AppError> {
2715 let (memory_id, body, memory_type): (i64, String, String) = conn
2716 .query_row(
2717 "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2718 FROM memories
2719 WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2720 rusqlite::params![namespace, memory_name],
2721 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2722 )
2723 .map_err(|e| match e {
2724 rusqlite::Error::QueryReturnedNoRows => {
2725 AppError::NotFound(format!("memory '{memory_name}' not found"))
2726 }
2727 other => AppError::Database(other),
2728 })?;
2729
2730 if body.trim().is_empty() {
2731 return Ok(EnrichItemResult::Skipped {
2732 reason: "body is empty".to_string(),
2733 });
2734 }
2735
2736 reembed_memory_vector(
2737 conn,
2738 namespace,
2739 memory_id,
2740 memory_name,
2741 &memory_type,
2742 &body,
2743 paths,
2744 )?;
2745
2746 Ok(EnrichItemResult::Done {
2747 memory_id: Some(memory_id),
2748 entity_id: None,
2749 entities: 0,
2750 rels: 0,
2751 chars_before: Some(body.chars().count()),
2752 chars_after: Some(body.chars().count()),
2753 cost: 0.0,
2754 is_oauth: true,
2755 })
2756}
2757
2758fn scan_operation(
2763 conn: &Connection,
2764 namespace: &str,
2765 args: &EnrichArgs,
2766) -> Result<Vec<String>, AppError> {
2767 let name_filter = resolve_name_filter(args)?;
2769 match args.operation {
2770 EnrichOperation::MemoryBindings => {
2771 let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
2772 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2773 }
2774 EnrichOperation::EntityDescriptions => {
2775 let rows = scan_entities_without_description(conn, namespace, args.limit)?;
2776 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2777 }
2778 EnrichOperation::BodyEnrich => {
2779 let rows =
2780 scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
2781 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2782 }
2783 EnrichOperation::ReEmbed => {
2784 let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
2785 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2786 }
2787 EnrichOperation::WeightCalibrate => {
2788 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
2789 Ok(rows
2790 .into_iter()
2791 .map(|(id, _, _, _, _)| id.to_string())
2792 .collect())
2793 }
2794 EnrichOperation::RelationReclassify => {
2795 let rows = scan_generic_relations(conn, namespace, args.limit)?;
2796 Ok(rows
2797 .into_iter()
2798 .map(|(id, _, _, _)| id.to_string())
2799 .collect())
2800 }
2801 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2802 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
2803 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
2804 }
2805 EnrichOperation::EntityTypeValidate => {
2806 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
2807 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2808 }
2809 EnrichOperation::DescriptionEnrich => {
2810 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2811 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2812 }
2813 EnrichOperation::DomainClassify
2814 | EnrichOperation::GraphAudit
2815 | EnrichOperation::DeepResearchSynth
2816 | EnrichOperation::BodyExtract => {
2817 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2818 let sql = format!(
2819 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2820 );
2821 let mut stmt = conn.prepare(&sql)?;
2822 let names = stmt
2823 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2824 .collect::<Result<Vec<_>, _>>()?;
2825 Ok(names)
2826 }
2827 }
2828}
2829
2830fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2836 if let Some(p) = explicit {
2837 if p.exists() {
2838 return Ok(p.to_path_buf());
2839 }
2840 return Err(AppError::Validation(format!(
2841 "Codex binary not found at explicit path: {}",
2842 p.display()
2843 )));
2844 }
2845
2846 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2847 let p = PathBuf::from(&env_path);
2848 if p.exists() {
2849 return Ok(p);
2850 }
2851 }
2852
2853 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2854 if let Some(path_var) = std::env::var_os("PATH") {
2855 for dir in std::env::split_paths(&path_var) {
2856 let candidate = dir.join(name);
2857 if candidate.exists() {
2858 return Ok(crate::extract::llm_embedding::resolve_real_binary(
2859 &candidate,
2860 ));
2861 }
2862 }
2863 }
2864
2865 Err(AppError::Validation(
2866 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2867 ))
2868}
2869
2870fn call_weight_calibrate(
2872 conn: &Connection,
2873 _namespace: &str,
2874 item_key: &str,
2875 binary: &Path,
2876 model: Option<&str>,
2877 timeout: u64,
2878 mode: &EnrichMode,
2879) -> Result<EnrichItemResult, AppError> {
2880 let rel_id: i64 = item_key
2881 .parse()
2882 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2883 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2884 .query_row(
2885 "SELECT e1.name, e2.name, r.relation, r.weight \
2886 FROM relationships r \
2887 JOIN entities e1 ON e1.id = r.source_id \
2888 JOIN entities e2 ON e2.id = r.target_id \
2889 WHERE r.id = ?1",
2890 rusqlite::params![rel_id],
2891 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2892 )
2893 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2894
2895 let input_text = format!(
2896 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2897 );
2898 let (value, cost, is_oauth) = match mode {
2899 EnrichMode::ClaudeCode => call_claude(
2900 binary,
2901 WEIGHT_CALIBRATE_PROMPT,
2902 WEIGHT_CALIBRATE_SCHEMA,
2903 &input_text,
2904 model,
2905 timeout,
2906 )?,
2907 EnrichMode::Codex => call_codex(
2908 binary,
2909 WEIGHT_CALIBRATE_PROMPT,
2910 WEIGHT_CALIBRATE_SCHEMA,
2911 &input_text,
2912 model,
2913 timeout,
2914 )?,
2915 };
2916
2917 let calibrated = value
2918 .get("calibrated_weight")
2919 .and_then(|v| v.as_f64())
2920 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2921
2922 conn.execute(
2923 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2924 rusqlite::params![calibrated, rel_id],
2925 )?;
2926
2927 Ok(EnrichItemResult::Done {
2928 memory_id: None,
2929 entity_id: None,
2930 entities: 0,
2931 rels: 1,
2932 chars_before: None,
2933 chars_after: None,
2934 cost,
2935 is_oauth,
2936 })
2937}
2938
2939fn call_relation_reclassify(
2941 conn: &Connection,
2942 _namespace: &str,
2943 item_key: &str,
2944 binary: &Path,
2945 model: Option<&str>,
2946 timeout: u64,
2947 mode: &EnrichMode,
2948) -> Result<EnrichItemResult, AppError> {
2949 let rel_id: i64 = item_key
2950 .parse()
2951 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2952 let (source_name, target_name, current_relation): (String, String, String) = conn
2953 .query_row(
2954 "SELECT e1.name, e2.name, r.relation \
2955 FROM relationships r \
2956 JOIN entities e1 ON e1.id = r.source_id \
2957 JOIN entities e2 ON e2.id = r.target_id \
2958 WHERE r.id = ?1",
2959 rusqlite::params![rel_id],
2960 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2961 )
2962 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2963
2964 let input_text = format!(
2965 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
2966 );
2967 let (value, cost, is_oauth) = match mode {
2968 EnrichMode::ClaudeCode => call_claude(
2969 binary,
2970 RELATION_RECLASSIFY_PROMPT,
2971 RELATION_RECLASSIFY_SCHEMA,
2972 &input_text,
2973 model,
2974 timeout,
2975 )?,
2976 EnrichMode::Codex => call_codex(
2977 binary,
2978 RELATION_RECLASSIFY_PROMPT,
2979 RELATION_RECLASSIFY_SCHEMA,
2980 &input_text,
2981 model,
2982 timeout,
2983 )?,
2984 };
2985
2986 let new_relation = value
2987 .get("relation")
2988 .and_then(|v| v.as_str())
2989 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
2990 let new_strength = value
2991 .get("strength")
2992 .and_then(|v| v.as_f64())
2993 .unwrap_or(0.5);
2994
2995 conn.execute(
2996 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
2997 rusqlite::params![new_relation, new_strength, rel_id],
2998 )?;
2999
3000 Ok(EnrichItemResult::Done {
3001 memory_id: None,
3002 entity_id: None,
3003 entities: 0,
3004 rels: 1,
3005 chars_before: None,
3006 chars_after: None,
3007 cost,
3008 is_oauth,
3009 })
3010}
3011
3012fn call_entity_connect(
3014 conn: &Connection,
3015 namespace: &str,
3016 item_key: &str,
3017 binary: &Path,
3018 model: Option<&str>,
3019 timeout: u64,
3020 mode: &EnrichMode,
3021) -> Result<EnrichItemResult, AppError> {
3022 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3023 let (e1_id, e1_name, e2_id, e2_name) =
3024 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3025 Some(p) => p,
3026 None => {
3027 return Ok(EnrichItemResult::Skipped {
3028 reason: "pair no longer isolated".into(),
3029 })
3030 }
3031 };
3032 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3033 let (value, cost, is_oauth) = match mode {
3034 EnrichMode::ClaudeCode => call_claude(
3035 binary,
3036 ENTITY_CONNECT_PROMPT,
3037 ENTITY_CONNECT_SCHEMA,
3038 &input_text,
3039 model,
3040 timeout,
3041 )?,
3042 EnrichMode::Codex => call_codex(
3043 binary,
3044 ENTITY_CONNECT_PROMPT,
3045 ENTITY_CONNECT_SCHEMA,
3046 &input_text,
3047 model,
3048 timeout,
3049 )?,
3050 };
3051 let relation = value
3052 .get("relation")
3053 .and_then(|v| v.as_str())
3054 .unwrap_or("none");
3055 if relation == "none" {
3056 return Ok(EnrichItemResult::Skipped {
3057 reason: "LLM determined no relationship".into(),
3058 });
3059 }
3060 let strength = value
3061 .get("strength")
3062 .and_then(|v| v.as_f64())
3063 .unwrap_or(0.5);
3064 conn.execute(
3065 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3066 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3067 )?;
3068 Ok(EnrichItemResult::Done {
3069 memory_id: None,
3070 entity_id: None,
3071 entities: 0,
3072 rels: 1,
3073 chars_before: None,
3074 chars_after: None,
3075 cost,
3076 is_oauth,
3077 })
3078}
3079
3080fn call_entity_type_validate(
3082 conn: &Connection,
3083 _namespace: &str,
3084 item_key: &str,
3085 binary: &Path,
3086 model: Option<&str>,
3087 timeout: u64,
3088 mode: &EnrichMode,
3089) -> Result<EnrichItemResult, AppError> {
3090 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3091 .query_row(
3092 "SELECT id, name, type FROM entities WHERE name = ?1",
3093 rusqlite::params![item_key],
3094 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3095 )
3096 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3097 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3098 let (value, cost, is_oauth) = match mode {
3099 EnrichMode::ClaudeCode => call_claude(
3100 binary,
3101 ENTITY_TYPE_VALIDATE_PROMPT,
3102 ENTITY_TYPE_VALIDATE_SCHEMA,
3103 &input_text,
3104 model,
3105 timeout,
3106 )?,
3107 EnrichMode::Codex => call_codex(
3108 binary,
3109 ENTITY_TYPE_VALIDATE_PROMPT,
3110 ENTITY_TYPE_VALIDATE_SCHEMA,
3111 &input_text,
3112 model,
3113 timeout,
3114 )?,
3115 };
3116 let validated_type = value
3117 .get("validated_type")
3118 .and_then(|v| v.as_str())
3119 .unwrap_or(&ent_type);
3120 let was_correct = value
3121 .get("was_correct")
3122 .and_then(|v| v.as_bool())
3123 .unwrap_or(true);
3124 if !was_correct {
3125 conn.execute(
3126 "UPDATE entities SET type = ?1 WHERE id = ?2",
3127 rusqlite::params![validated_type, ent_id],
3128 )?;
3129 }
3130 Ok(EnrichItemResult::Done {
3131 memory_id: None,
3132 entity_id: Some(ent_id),
3133 entities: 1,
3134 rels: 0,
3135 chars_before: None,
3136 chars_after: None,
3137 cost,
3138 is_oauth,
3139 })
3140}
3141
3142fn call_description_enrich(
3144 conn: &Connection,
3145 _namespace: &str,
3146 item_key: &str,
3147 binary: &Path,
3148 model: Option<&str>,
3149 timeout: u64,
3150 mode: &EnrichMode,
3151) -> Result<EnrichItemResult, AppError> {
3152 let (mem_id, body, old_desc): (i64, String, String) = conn
3153 .query_row(
3154 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3155 rusqlite::params![item_key],
3156 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3157 )
3158 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3159 let snippet: String = body.chars().take(500).collect();
3160 let input_text = format!(
3161 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3162 );
3163 let (value, cost, is_oauth) = match mode {
3164 EnrichMode::ClaudeCode => call_claude(
3165 binary,
3166 DESCRIPTION_ENRICH_PROMPT,
3167 DESCRIPTION_ENRICH_SCHEMA,
3168 &input_text,
3169 model,
3170 timeout,
3171 )?,
3172 EnrichMode::Codex => call_codex(
3173 binary,
3174 DESCRIPTION_ENRICH_PROMPT,
3175 DESCRIPTION_ENRICH_SCHEMA,
3176 &input_text,
3177 model,
3178 timeout,
3179 )?,
3180 };
3181 let new_desc = value
3182 .get("description")
3183 .and_then(|v| v.as_str())
3184 .unwrap_or(&old_desc);
3185 conn.execute(
3186 "UPDATE memories SET description = ?1 WHERE id = ?2",
3187 rusqlite::params![new_desc, mem_id],
3188 )?;
3189 Ok(EnrichItemResult::Done {
3190 memory_id: Some(mem_id),
3191 entity_id: None,
3192 entities: 0,
3193 rels: 0,
3194 chars_before: Some(old_desc.len()),
3195 chars_after: Some(new_desc.len()),
3196 cost,
3197 is_oauth,
3198 })
3199}
3200
3201fn call_domain_classify(
3203 conn: &Connection,
3204 _namespace: &str,
3205 item_key: &str,
3206 binary: &Path,
3207 model: Option<&str>,
3208 timeout: u64,
3209 mode: &EnrichMode,
3210) -> Result<EnrichItemResult, AppError> {
3211 let (mem_id, body, desc): (i64, String, String) = conn
3212 .query_row(
3213 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3214 rusqlite::params![item_key],
3215 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3216 )
3217 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3218 let snippet: String = body.chars().take(500).collect();
3219 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3220 let (value, cost, is_oauth) = match mode {
3221 EnrichMode::ClaudeCode => call_claude(
3222 binary,
3223 DOMAIN_CLASSIFY_PROMPT,
3224 DOMAIN_CLASSIFY_SCHEMA,
3225 &input_text,
3226 model,
3227 timeout,
3228 )?,
3229 EnrichMode::Codex => call_codex(
3230 binary,
3231 DOMAIN_CLASSIFY_PROMPT,
3232 DOMAIN_CLASSIFY_SCHEMA,
3233 &input_text,
3234 model,
3235 timeout,
3236 )?,
3237 };
3238 let domain = value
3239 .get("domain")
3240 .and_then(|v| v.as_str())
3241 .unwrap_or("uncategorized");
3242 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3243 conn.execute(
3244 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3245 rusqlite::params![metadata, mem_id],
3246 )?;
3247 Ok(EnrichItemResult::Done {
3248 memory_id: Some(mem_id),
3249 entity_id: None,
3250 entities: 0,
3251 rels: 0,
3252 chars_before: None,
3253 chars_after: None,
3254 cost,
3255 is_oauth,
3256 })
3257}
3258
3259fn call_graph_audit(
3261 conn: &Connection,
3262 _namespace: &str,
3263 item_key: &str,
3264 binary: &Path,
3265 model: Option<&str>,
3266 timeout: u64,
3267 mode: &EnrichMode,
3268) -> Result<EnrichItemResult, AppError> {
3269 let (mem_id, body, desc): (i64, String, String) = conn
3270 .query_row(
3271 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3272 rusqlite::params![item_key],
3273 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3274 )
3275 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3276 let snippet: String = body.chars().take(500).collect();
3277 let ent_count: i64 = conn
3278 .query_row(
3279 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3280 rusqlite::params![mem_id],
3281 |r| r.get(0),
3282 )
3283 .unwrap_or(0);
3284 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3285 let (value, cost, is_oauth) = match mode {
3286 EnrichMode::ClaudeCode => call_claude(
3287 binary,
3288 GRAPH_AUDIT_PROMPT,
3289 GRAPH_AUDIT_SCHEMA,
3290 &input_text,
3291 model,
3292 timeout,
3293 )?,
3294 EnrichMode::Codex => call_codex(
3295 binary,
3296 GRAPH_AUDIT_PROMPT,
3297 GRAPH_AUDIT_SCHEMA,
3298 &input_text,
3299 model,
3300 timeout,
3301 )?,
3302 };
3303 let issues = value
3304 .get("issues")
3305 .and_then(|v| v.as_array())
3306 .map(|a| a.len())
3307 .unwrap_or(0);
3308 Ok(EnrichItemResult::Done {
3309 memory_id: Some(mem_id),
3310 entity_id: None,
3311 entities: 0,
3312 rels: issues,
3313 chars_before: None,
3314 chars_after: None,
3315 cost,
3316 is_oauth,
3317 })
3318}
3319
3320fn call_deep_research_synth(
3322 conn: &Connection,
3323 namespace: &str,
3324 item_key: &str,
3325 binary: &Path,
3326 model: Option<&str>,
3327 timeout: u64,
3328 mode: &EnrichMode,
3329) -> Result<EnrichItemResult, AppError> {
3330 let (mem_id, body): (i64, String) = conn
3331 .query_row(
3332 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3333 rusqlite::params![item_key],
3334 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3335 )
3336 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3337 let snippet: String = body.chars().take(2000).collect();
3338 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3339 let (value, cost, is_oauth) = match mode {
3340 EnrichMode::ClaudeCode => call_claude(
3341 binary,
3342 DEEP_RESEARCH_SYNTH_PROMPT,
3343 DEEP_RESEARCH_SYNTH_SCHEMA,
3344 &input_text,
3345 model,
3346 timeout,
3347 )?,
3348 EnrichMode::Codex => call_codex(
3349 binary,
3350 DEEP_RESEARCH_SYNTH_PROMPT,
3351 DEEP_RESEARCH_SYNTH_SCHEMA,
3352 &input_text,
3353 model,
3354 timeout,
3355 )?,
3356 };
3357 let mut ent_count = 0usize;
3358 let mut rel_count = 0usize;
3359 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3360 for e in ents {
3361 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3362 let etype_str = e
3363 .get("entity_type")
3364 .and_then(|v| v.as_str())
3365 .unwrap_or("concept");
3366 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3367 if name.len() >= 2 {
3368 let ne = NewEntity {
3369 name: name.to_string(),
3370 entity_type: etype,
3371 description: None,
3372 };
3373 let _ = entities::upsert_entity(conn, namespace, &ne);
3374 ent_count += 1;
3375 }
3376 }
3377 }
3378 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3379 for r in rels {
3380 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3381 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3382 if src.is_empty() || tgt.is_empty() {
3383 continue;
3384 }
3385 let rel = r
3386 .get("relation")
3387 .and_then(|v| v.as_str())
3388 .unwrap_or("related");
3389 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3390 if let (Some(sid), Some(tid)) = (
3391 entities::find_entity_id(conn, namespace, src)?,
3392 entities::find_entity_id(conn, namespace, tgt)?,
3393 ) {
3394 let _ = entities::create_or_fetch_relationship(
3395 conn, namespace, sid, tid, rel, str_, None,
3396 );
3397 rel_count += 1;
3398 }
3399 }
3400 }
3401 Ok(EnrichItemResult::Done {
3402 memory_id: Some(mem_id),
3403 entity_id: None,
3404 entities: ent_count,
3405 rels: rel_count,
3406 chars_before: None,
3407 chars_after: None,
3408 cost,
3409 is_oauth,
3410 })
3411}
3412
3413fn call_body_extract(
3415 conn: &Connection,
3416 _namespace: &str,
3417 item_key: &str,
3418 binary: &Path,
3419 model: Option<&str>,
3420 timeout: u64,
3421 mode: &EnrichMode,
3422) -> Result<EnrichItemResult, AppError> {
3423 let (mem_id, body): (i64, String) = conn
3424 .query_row(
3425 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3426 rusqlite::params![item_key],
3427 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3428 )
3429 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3430 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3431 let (value, cost, is_oauth) = match mode {
3432 EnrichMode::ClaudeCode => call_claude(
3433 binary,
3434 BODY_EXTRACT_PROMPT,
3435 BODY_EXTRACT_SCHEMA,
3436 &input_text,
3437 model,
3438 timeout,
3439 )?,
3440 EnrichMode::Codex => call_codex(
3441 binary,
3442 BODY_EXTRACT_PROMPT,
3443 BODY_EXTRACT_SCHEMA,
3444 &input_text,
3445 model,
3446 timeout,
3447 )?,
3448 };
3449 let restructured = value
3450 .get("restructured_body")
3451 .and_then(|v| v.as_str())
3452 .unwrap_or(&body);
3453 let chars_before = body.len();
3454 let chars_after = restructured.len();
3455 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3456 conn.execute(
3457 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3458 rusqlite::params![restructured, new_hash, mem_id],
3459 )?;
3460 Ok(EnrichItemResult::Done {
3461 memory_id: Some(mem_id),
3462 entity_id: None,
3463 entities: 0,
3464 rels: 0,
3465 chars_before: Some(chars_before),
3466 chars_after: Some(chars_after),
3467 cost,
3468 is_oauth,
3469 })
3470}
3471
3472#[allow(clippy::type_complexity)]
3474fn scan_isolated_entity_pairs(
3475 conn: &Connection,
3476 namespace: &str,
3477 limit: Option<usize>,
3478) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3479 let limit_val = limit.unwrap_or(50) as i64;
3480 let mut stmt = conn.prepare_cached(
3481 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3482 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3483 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3484 (r.source_id = e1.id AND r.target_id = e2.id) OR \
3485 (r.source_id = e2.id AND r.target_id = e1.id)) \
3486 LIMIT ?2",
3487 )?;
3488 let rows = stmt
3489 .query_map(rusqlite::params![namespace, limit_val], |r| {
3490 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3491 })?
3492 .collect::<Result<Vec<_>, _>>()?;
3493 Ok(rows)
3494}
3495
3496fn scan_entities_for_type_validation(
3498 conn: &Connection,
3499 namespace: &str,
3500 limit: Option<usize>,
3501) -> Result<Vec<(i64, String, String)>, AppError> {
3502 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3503 let sql = format!(
3504 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3505 );
3506 let mut stmt = conn.prepare(&sql)?;
3507 let rows = stmt
3508 .query_map(rusqlite::params![namespace], |r| {
3509 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3510 })?
3511 .collect::<Result<Vec<_>, _>>()?;
3512 Ok(rows)
3513}
3514
3515fn scan_generic_descriptions(
3517 conn: &Connection,
3518 namespace: &str,
3519 limit: Option<usize>,
3520) -> Result<Vec<(i64, String, String)>, AppError> {
3521 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3522 let sql = format!(
3523 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3524 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3525 ORDER BY id {limit_clause}"
3526 );
3527 let mut stmt = conn.prepare(&sql)?;
3528 let rows = stmt
3529 .query_map(rusqlite::params![namespace], |r| {
3530 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3531 })?
3532 .collect::<Result<Vec<_>, _>>()?;
3533 Ok(rows)
3534}
3535
3536fn call_codex(
3540 binary: &Path,
3541 prompt: &str,
3542 json_schema: &str,
3543 input_text: &str,
3544 model: Option<&str>,
3545 timeout_secs: u64,
3546) -> Result<(serde_json::Value, f64, bool), AppError> {
3547 use wait_timeout::ChildExt;
3548
3549 super::codex_spawn::validate_codex_model(model)?;
3554 let schema_file = super::codex_spawn::trusted_schema_path()?;
3555
3556 let args = super::codex_spawn::CodexSpawnArgs {
3557 binary,
3558 prompt,
3559 json_schema,
3560 input_text,
3561 model,
3562 timeout_secs,
3563 schema_path: schema_file.clone(),
3564 };
3565 let mut cmd = super::codex_spawn::build_codex_command(&args);
3566
3567 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3568 AppError::Io(std::io::Error::new(
3569 e.kind(),
3570 format!("failed to spawn codex: {e}"),
3571 ))
3572 })?;
3573
3574 let full_prompt = format!("{prompt}\n\n{input_text}");
3575 let stdin_bytes = full_prompt.into_bytes();
3576 let mut child_stdin = child
3577 .stdin
3578 .take()
3579 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3580 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3581 child_stdin.write_all(&stdin_bytes)?;
3582 drop(child_stdin);
3583 Ok(())
3584 });
3585
3586 let start = std::time::Instant::now();
3587 let timeout = std::time::Duration::from_secs(timeout_secs);
3588 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3589 let _ = std::fs::remove_file(&schema_file);
3590
3591 match status {
3592 Some(exit_status) => {
3593 stdin_thread
3594 .join()
3595 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3596 .map_err(AppError::Io)?;
3597
3598 tracing::debug!(
3599 target: "process",
3600 exit_code = ?exit_status.code(),
3601 elapsed_ms = start.elapsed().as_millis() as u64,
3602 "external process completed"
3603 );
3604
3605 let mut stdout_buf = Vec::new();
3606 if let Some(mut out) = child.stdout.take() {
3607 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3608 }
3609 if !exit_status.success() {
3610 let mut stderr_buf = Vec::new();
3611 if let Some(mut err) = child.stderr.take() {
3612 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3613 }
3614 let stderr_str = String::from_utf8_lossy(&stderr_buf);
3615 tracing::warn!(
3616 target: "enrich",
3617 exit_code = ?exit_status.code(),
3618 stderr = %stderr_str.trim(),
3619 "codex process failed"
3620 );
3621 return Err(AppError::Validation(format!(
3622 "codex exited with code {:?}: {}",
3623 exit_status.code(),
3624 stderr_str.trim()
3625 )));
3626 }
3627 let stdout_str = String::from_utf8(stdout_buf)
3628 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3629 let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
3632 let value: serde_json::Value =
3638 serde_json::from_str(&result.last_agent_text).map_err(|e| {
3639 AppError::Validation(format!(
3640 "codex agent_message is not valid JSON: {e}; raw={}",
3641 result.last_agent_text
3642 ))
3643 })?;
3644 Ok((value, 0.0, false))
3645 }
3646 None => {
3647 let _ = child.kill();
3648 let _ = child.wait();
3649 let _ = stdin_thread.join();
3650 Err(AppError::Validation(format!(
3651 "codex timed out after {timeout_secs} seconds"
3652 )))
3653 }
3654 }
3655}
3656
3657#[cfg(test)]
3662mod tests {
3663 use super::*;
3664 use rusqlite::Connection;
3665 #[cfg(unix)]
3666 use std::os::unix::fs::PermissionsExt;
3667
3668 fn open_test_db() -> Connection {
3670 let conn = Connection::open_in_memory().expect("in-memory db");
3671 conn.execute_batch(
3672 "CREATE TABLE memories (
3673 id INTEGER PRIMARY KEY AUTOINCREMENT,
3674 namespace TEXT NOT NULL DEFAULT 'global',
3675 name TEXT NOT NULL,
3676 type TEXT NOT NULL DEFAULT 'note',
3677 description TEXT NOT NULL DEFAULT '',
3678 body TEXT NOT NULL DEFAULT '',
3679 body_hash TEXT NOT NULL DEFAULT '',
3680 session_id TEXT,
3681 source TEXT NOT NULL DEFAULT 'agent',
3682 metadata TEXT NOT NULL DEFAULT '{}',
3683 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3684 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3685 deleted_at INTEGER,
3686 UNIQUE(namespace, name)
3687 );
3688 CREATE TABLE entities (
3689 id INTEGER PRIMARY KEY AUTOINCREMENT,
3690 namespace TEXT NOT NULL DEFAULT 'global',
3691 name TEXT NOT NULL,
3692 type TEXT NOT NULL DEFAULT 'concept',
3693 description TEXT,
3694 degree INTEGER NOT NULL DEFAULT 0,
3695 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3696 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3697 UNIQUE(namespace, name)
3698 );
3699 CREATE TABLE memory_entities (
3700 memory_id INTEGER NOT NULL,
3701 entity_id INTEGER NOT NULL,
3702 PRIMARY KEY (memory_id, entity_id)
3703 );
3704 CREATE TABLE relationships (
3705 id INTEGER PRIMARY KEY AUTOINCREMENT,
3706 namespace TEXT NOT NULL DEFAULT 'global',
3707 source_id INTEGER NOT NULL,
3708 target_id INTEGER NOT NULL,
3709 relation TEXT NOT NULL,
3710 weight REAL NOT NULL DEFAULT 0.5,
3711 description TEXT,
3712 UNIQUE(source_id, target_id, relation)
3713 );
3714 CREATE TABLE memory_embeddings (
3715 memory_id INTEGER PRIMARY KEY,
3716 namespace TEXT NOT NULL,
3717 embedding BLOB NOT NULL,
3718 source TEXT NOT NULL,
3719 model TEXT NOT NULL DEFAULT '',
3720 dim INTEGER NOT NULL DEFAULT 384,
3721 created_at INTEGER NOT NULL DEFAULT (unixepoch())
3722 );",
3723 )
3724 .expect("schema creation must succeed");
3725 conn
3726 }
3727
3728 #[test]
3729 fn scan_unbound_memories_finds_memories_without_bindings() {
3730 let conn = open_test_db();
3731 conn.execute(
3732 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
3733 [],
3734 )
3735 .unwrap();
3736
3737 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3738 assert_eq!(results.len(), 1);
3739 assert_eq!(results[0].1, "test-mem");
3740 }
3741
3742 #[test]
3743 fn scan_unbound_memories_excludes_bound_memories() {
3744 let conn = open_test_db();
3745 conn.execute(
3746 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
3747 [],
3748 )
3749 .unwrap();
3750 let mem_id: i64 = conn
3751 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
3752 r.get(0)
3753 })
3754 .unwrap();
3755 conn.execute(
3756 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
3757 [],
3758 )
3759 .unwrap();
3760 let ent_id: i64 = conn
3761 .query_row(
3762 "SELECT id FROM entities WHERE name='some-entity'",
3763 [],
3764 |r| r.get(0),
3765 )
3766 .unwrap();
3767 conn.execute(
3768 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
3769 rusqlite::params![mem_id, ent_id],
3770 )
3771 .unwrap();
3772
3773 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3774 assert!(results.is_empty(), "bound memory must not appear in scan");
3775 }
3776
3777 #[test]
3778 fn scan_entities_without_description_finds_null_description() {
3779 let conn = open_test_db();
3780 conn.execute(
3781 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
3782 [],
3783 )
3784 .unwrap();
3785
3786 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3787 assert_eq!(results.len(), 1);
3788 assert_eq!(results[0].1, "my-tool");
3789 }
3790
3791 #[test]
3792 fn scan_entities_without_description_excludes_entities_with_description() {
3793 let conn = open_test_db();
3794 conn.execute(
3795 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
3796 [],
3797 )
3798 .unwrap();
3799
3800 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3801 assert!(
3802 results.is_empty(),
3803 "entity with description must not appear"
3804 );
3805 }
3806
3807 #[test]
3808 fn scan_short_body_memories_finds_short_bodies() {
3809 let conn = open_test_db();
3810 conn.execute(
3811 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3812 [],
3813 )
3814 .unwrap();
3815
3816 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3817 assert_eq!(results.len(), 1);
3818 assert_eq!(results[0].1, "short-mem");
3819 }
3820
3821 #[test]
3822 fn scan_short_body_memories_excludes_long_bodies() {
3823 let conn = open_test_db();
3824 let long_body = "a".repeat(1000);
3825 conn.execute(
3826 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3827 rusqlite::params![long_body],
3828 )
3829 .unwrap();
3830
3831 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3832 assert!(results.is_empty(), "long memory must not appear in scan");
3833 }
3834
3835 #[test]
3836 fn scan_respects_limit() {
3837 let conn = open_test_db();
3838 for i in 0..5 {
3839 conn.execute(
3840 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3841 [],
3842 )
3843 .unwrap();
3844 }
3845
3846 let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3847 assert_eq!(results.len(), 3, "limit must be respected");
3848 }
3849
3850 #[test]
3851 fn scan_memories_without_embeddings_finds_only_missing_rows() {
3852 let conn = open_test_db();
3853 conn.execute(
3854 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
3855 [],
3856 )
3857 .unwrap();
3858 conn.execute(
3859 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
3860 [],
3861 )
3862 .unwrap();
3863 let memory_id: i64 = conn
3864 .query_row(
3865 "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
3866 [],
3867 |r| r.get(0),
3868 )
3869 .unwrap();
3870 let embedding = vec![0.0_f32; crate::constants::EMBEDDING_DIM];
3871 memories::upsert_vec(
3872 &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
3873 )
3874 .unwrap();
3875
3876 let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
3877 assert_eq!(results.len(), 1);
3878 assert_eq!(results[0].1, "missing-vec");
3879 }
3880
3881 #[test]
3882 fn scan_memories_without_embeddings_respects_name_filter() {
3883 let conn = open_test_db();
3884 conn.execute(
3885 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
3886 [],
3887 )
3888 .unwrap();
3889 conn.execute(
3890 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
3891 [],
3892 )
3893 .unwrap();
3894
3895 let results =
3896 scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
3897 .unwrap();
3898 assert_eq!(results.len(), 1);
3899 assert_eq!(results[0].1, "match-me");
3900 }
3901
3902 #[test]
3903 fn queue_db_schema_creates_correctly() {
3904 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3905 let conn = open_queue_db(&tmp_path).expect("queue db must open");
3906 let count: i64 = conn
3907 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3908 .unwrap();
3909 assert_eq!(count, 0);
3910 let _ = std::fs::remove_file(&tmp_path);
3911 }
3912
3913 #[test]
3914 fn parse_claude_output_valid_bindings() {
3915 let output = r#"[
3916 {"type":"system","subtype":"init"},
3917 {"type":"result","is_error":false,"total_cost_usd":0.01,
3918 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3919 ]"#;
3920 let result = crate::commands::claude_runner::parse_claude_output(output)
3921 .expect("must parse successfully");
3922 assert!(result.value.get("entities").is_some());
3923 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3924 assert!(!result.is_oauth);
3925 }
3926
3927 #[test]
3928 fn parse_claude_output_detects_oauth() {
3929 let output = r#"[
3930 {"type":"system","subtype":"init","apiKeySource":"none"},
3931 {"type":"result","is_error":false,"total_cost_usd":0.0,
3932 "structured_output":{"entities":[],"relationships":[]}}
3933 ]"#;
3934 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
3935 assert!(result.is_oauth);
3936 }
3937
3938 #[test]
3939 fn parse_claude_output_rate_limit_returns_error() {
3940 let output = r#"[
3941 {"type":"system","subtype":"init"},
3942 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
3943 ]"#;
3944 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3945 assert!(matches!(err, AppError::RateLimited { .. }));
3946 }
3947
3948 #[test]
3949 fn parse_claude_output_auth_error() {
3950 let output = r#"[
3951 {"type":"system","subtype":"init"},
3952 {"type":"result","is_error":true,"error":"authentication failed"}
3953 ]"#;
3954 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3955 assert!(format!("{err}").contains("authentication failed"));
3956 }
3957
3958 #[cfg(unix)]
3959 #[test]
3960 fn call_codex_returns_raw_json_for_body_enrich_schema() {
3961 let tmp = tempfile::tempdir().expect("tempdir");
3962 let binary = tmp.path().join("codex-mock");
3963 std::fs::write(
3964 &binary,
3965 r#"#!/usr/bin/env bash
3966set -euo pipefail
3967cat <<'JSONL'
3968{"type":"thread.started","thread_id":"mock-thread-0"}
3969{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
3970{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
3971JSONL
3972"#,
3973 )
3974 .expect("mock codex write");
3975 let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
3976 perms.set_mode(0o755);
3977 std::fs::set_permissions(&binary, perms).expect("chmod");
3978
3979 let (value, cost, is_oauth) =
3980 call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
3981 .expect("call_codex must accept body-enrich payload");
3982
3983 assert_eq!(value["enriched_body"], "expanded body");
3984 assert_eq!(cost, 0.0);
3985 assert!(!is_oauth);
3986 }
3987
3988 #[test]
3989 fn dry_run_emits_preview_without_calling_llm() {
3990 let conn = open_test_db();
3995 conn.execute(
3996 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
3997 [],
3998 )
3999 .unwrap();
4000
4001 let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
4002 assert_eq!(results.len(), 1);
4003 assert_eq!(results[0].1, "dry-mem");
4004 }
4007
4008 #[test]
4009 fn persist_entity_description_updates_db() {
4010 let conn = open_test_db();
4011 conn.execute(
4012 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4013 [],
4014 )
4015 .unwrap();
4016 let eid: i64 = conn
4017 .query_row(
4018 "SELECT id FROM entities WHERE name='tokio-runtime'",
4019 [],
4020 |r| r.get(0),
4021 )
4022 .unwrap();
4023
4024 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4025
4026 let desc: String = conn
4027 .query_row(
4028 "SELECT description FROM entities WHERE id=?1",
4029 rusqlite::params![eid],
4030 |r| r.get(0),
4031 )
4032 .unwrap();
4033 assert_eq!(desc, "Async runtime for Rust applications");
4034 }
4035
4036 #[test]
4037 fn bindings_schema_is_valid_json() {
4038 let _: serde_json::Value =
4039 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4040 }
4041
4042 #[test]
4043 fn entity_description_schema_is_valid_json() {
4044 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4045 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4046 }
4047
4048 #[test]
4049 fn body_enrich_schema_is_valid_json() {
4050 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4051 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4052 }
4053}