1use crate::commands::ingest_claude::find_claude_binary;
25use crate::constants::MAX_MEMORY_BODY_LEN;
26use crate::entity_type::EntityType;
27use crate::errors::AppError;
28use crate::paths::AppPaths;
29use crate::storage::connection::{ensure_db_ready, open_rw};
30use crate::storage::entities::{self, NewEntity, NewRelationship};
31use crate::storage::memories;
32
33use rusqlite::Connection;
34use serde::{Deserialize, Serialize};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::time::Instant;
38
39const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
44const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
45const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
46const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
47
48const BINDINGS_SCHEMA: &str = r#"{
53 "type": "object",
54 "properties": {
55 "entities": {
56 "type": "array",
57 "items": {
58 "type": "object",
59 "properties": {
60 "name": { "type": "string" },
61 "entity_type": {
62 "type": "string",
63 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
64 }
65 },
66 "required": ["name", "entity_type"],
67 "additionalProperties": false
68 }
69 },
70 "relationships": {
71 "type": "array",
72 "items": {
73 "type": "object",
74 "properties": {
75 "source": { "type": "string" },
76 "target": { "type": "string" },
77 "relation": {
78 "type": "string",
79 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
80 },
81 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
82 },
83 "required": ["source","target","relation","strength"],
84 "additionalProperties": false
85 }
86 }
87 },
88 "required": ["entities","relationships"],
89 "additionalProperties": false
90}"#;
91
92const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
93 "type": "object",
94 "properties": {
95 "description": { "type": "string" }
96 },
97 "required": ["description"],
98 "additionalProperties": false
99}"#;
100
101const BODY_ENRICH_SCHEMA: &str = r#"{
102 "type": "object",
103 "properties": {
104 "enriched_body": { "type": "string" }
105 },
106 "required": ["enriched_body"],
107 "additionalProperties": false
108}"#;
109
110const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
112Scale:\n\
113- 0.9 = vital hard dependency (A cannot function without B)\n\
114- 0.7 = important design relationship (A strongly supports/enables B)\n\
115- 0.5 = useful contextual link (A and B share relevant context)\n\
116- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
117Respond with the calibrated weight and brief reasoning.";
118
119const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
120 "type": "object",
121 "properties": {
122 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
123 "reasoning": { "type": "string" }
124 },
125 "required": ["calibrated_weight", "reasoning"],
126 "additionalProperties": false
127}"#;
128
129const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
131Valid canonical relations (pick exactly one):\n\
132- depends-on: A cannot function without B\n\
133- uses: A utilizes B but could substitute it\n\
134- supports: A reinforces or enables B\n\
135- causes: A triggers or produces B\n\
136- fixes: A resolves a problem in B\n\
137- contradicts: A conflicts with or invalidates B\n\
138- applies-to: A is relevant to or scoped within B\n\
139- follows: A comes after B in sequence\n\
140- replaces: A substitutes B\n\
141- tracked-in: A is monitored in B\n\
142- related: A and B share context (use sparingly)\n\n\
143Respond with the correct relation, strength, and reasoning.";
144
145const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
146 "type": "object",
147 "properties": {
148 "relation": { "type": "string" },
149 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
150 "reasoning": { "type": "string" }
151 },
152 "required": ["relation", "strength", "reasoning"],
153 "additionalProperties": false
154}"#;
155
156const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
158Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
159If NO meaningful relationship exists, set relation to \"none\".\n\
160Respond with the relation (or \"none\"), strength, and reasoning.";
161
162const ENTITY_CONNECT_SCHEMA: &str = r#"{
163 "type": "object",
164 "properties": {
165 "relation": { "type": "string" },
166 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
167 "reasoning": { "type": "string" }
168 },
169 "required": ["relation", "strength", "reasoning"],
170 "additionalProperties": false
171}"#;
172
173const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
175Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
176If the current type is correct, keep it. If wrong, suggest the correct type.\n\
177Respond with the validated type and reasoning.";
178
179const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
180 "type": "object",
181 "properties": {
182 "validated_type": { "type": "string" },
183 "was_correct": { "type": "boolean" },
184 "reasoning": { "type": "string" }
185 },
186 "required": ["validated_type", "was_correct", "reasoning"],
187 "additionalProperties": false
188}"#;
189
190const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
192BAD: 'ingested from docs/auth.md'\n\
193GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
194Respond with the improved description and reasoning.";
195
196const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
197 "type": "object",
198 "properties": {
199 "description": { "type": "string" },
200 "reasoning": { "type": "string" }
201 },
202 "required": ["description", "reasoning"],
203 "additionalProperties": false
204}"#;
205
206const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
208Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
209
210const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
211 "type": "object",
212 "properties": {
213 "domain": { "type": "string" },
214 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
215 "reasoning": { "type": "string" }
216 },
217 "required": ["domain", "confidence", "reasoning"],
218 "additionalProperties": false
219}"#;
220
221const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
223Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
224Respond with a list of issues found (or empty if none) and an overall quality score.";
225
226const GRAPH_AUDIT_SCHEMA: &str = r#"{
227 "type": "object",
228 "properties": {
229 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
230 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
231 "reasoning": { "type": "string" }
232 },
233 "required": ["quality_score", "issues", "reasoning"],
234 "additionalProperties": false
235}"#;
236
237const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
239Entity names: lowercase kebab-case, domain-specific.\n\
240Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
241Respond with extracted entities, relationships, and a synthesis summary.";
242
243const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
244 "type": "object",
245 "properties": {
246 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
247 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
248 "summary": { "type": "string" }
249 },
250 "required": ["entities", "relationships", "summary"],
251 "additionalProperties": false
252}"#;
253
254const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
256Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
257Respond with the restructured body and a brief summary of changes.";
258
259const BODY_EXTRACT_SCHEMA: &str = r#"{
260 "type": "object",
261 "properties": {
262 "restructured_body": { "type": "string" },
263 "changes_summary": { "type": "string" }
264 },
265 "required": ["restructured_body", "changes_summary"],
266 "additionalProperties": false
267}"#;
268
269const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2741. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2752. Typed relationships between entities with strength scores\n\n\
276Rules:\n\
277- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
278- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
279- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
280- NEVER use 'mentions' as relationship type\n\
281- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
282- Prefer fewer high-quality entities over many low-quality ones";
283
284const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
285
286const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
287
288#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
294#[serde(rename_all = "kebab-case")]
295pub enum EnrichOperation {
296 MemoryBindings,
298 EntityDescriptions,
300 BodyEnrich,
302 ReEmbed,
304 WeightCalibrate,
306 RelationReclassify,
308 EntityConnect,
310 EntityTypeValidate,
312 DescriptionEnrich,
314 CrossDomainBridges,
316 DomainClassify,
318 GraphAudit,
320 DeepResearchSynth,
322 BodyExtract,
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
328pub enum EnrichMode {
329 ClaudeCode,
331 Codex,
333}
334
335impl std::fmt::Display for EnrichMode {
336 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337 match self {
338 EnrichMode::ClaudeCode => write!(f, "claude-code"),
339 EnrichMode::Codex => write!(f, "codex"),
340 }
341 }
342}
343
344#[derive(clap::Args)]
346#[command(
347 about = "Enrich graph memories and entities using an LLM provider",
348 after_long_help = "EXAMPLES:\n \
349 # Add missing entity bindings to all unbound memories\n \
350 sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n \
351 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
352 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
353 # Expand short memory bodies (GAP-18)\n \
354 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
355 # Rebuild only missing memory embeddings without rewriting bodies\n \
356 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
357 # Resume an interrupted body-enrich run\n \
358 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
359 # Retry only failed items from a previous run\n \
360 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
361 EXIT CODES:\n \
362 0 success\n \
363 1 validation error (bad args, binary not found)\n \
364 14 I/O error"
365)]
366pub struct EnrichArgs {
367 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
369 pub operation: EnrichOperation,
370
371 #[arg(long, value_enum, default_value = "claude-code")]
373 pub mode: EnrichMode,
374
375 #[arg(long, value_name = "N")]
377 pub limit: Option<usize>,
378
379 #[arg(long)]
381 pub dry_run: bool,
382
383 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
385 pub namespace: Option<String>,
386
387 #[arg(long, value_name = "PATH")]
390 pub claude_binary: Option<PathBuf>,
391
392 #[arg(long, value_name = "MODEL")]
394 pub claude_model: Option<String>,
395
396 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
398 pub claude_timeout: u64,
399
400 #[arg(long, value_name = "PATH")]
403 pub codex_binary: Option<PathBuf>,
404
405 #[arg(long, value_name = "MODEL")]
407 pub codex_model: Option<String>,
408
409 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
411 pub codex_timeout: u64,
412
413 #[arg(long, value_name = "USD")]
416 pub max_cost_usd: Option<f64>,
417
418 #[arg(long)]
421 pub resume: bool,
422
423 #[arg(long)]
425 pub retry_failed: bool,
426
427 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
430 pub min_output_chars: usize,
431
432 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
434 pub max_output_chars: usize,
435
436 #[arg(long, default_value_t = true)]
438 pub preserve_check: bool,
439
440 #[arg(long, value_name = "PATH")]
442 pub prompt_template: Option<PathBuf>,
443
444 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
448 pub llm_parallelism: u32,
449
450 #[arg(long)]
453 pub json: bool,
454
455 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
457 pub db: Option<String>,
458
459 #[arg(long, value_name = "SECONDS")]
462 pub wait_job_singleton: Option<u64>,
463
464 #[arg(long, default_value_t = false)]
468 pub force_job_singleton: bool,
469
470 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
474 pub names: Vec<String>,
475
476 #[arg(long, value_name = "PATH")]
480 pub names_file: Option<PathBuf>,
481
482 #[arg(long, default_value_t = false)]
486 pub preflight_check: bool,
487
488 #[arg(long, value_enum)]
492 pub fallback_mode: Option<EnrichMode>,
493
494 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
497 pub rate_limit_buffer: u64,
498
499 #[arg(long, default_value_t = true)]
503 pub max_load_check: bool,
504
505 #[arg(long, value_name = "N", default_value_t = 5)]
508 pub circuit_breaker_threshold: u32,
509
510 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
517 pub preserve_threshold: f64,
518
519 #[arg(long, default_value_t = true)]
524 pub codex_model_validate: bool,
525
526 #[arg(long, value_name = "MODEL")]
531 pub codex_model_fallback: Option<String>,
532}
533
534#[derive(Debug, Serialize)]
543struct PhaseEvent<'a> {
544 phase: &'a str,
545 #[serde(skip_serializing_if = "Option::is_none")]
546 binary_path: Option<&'a str>,
547 #[serde(skip_serializing_if = "Option::is_none")]
548 version: Option<&'a str>,
549 #[serde(skip_serializing_if = "Option::is_none")]
550 items_total: Option<usize>,
551 #[serde(skip_serializing_if = "Option::is_none")]
552 items_pending: Option<usize>,
553 #[serde(skip_serializing_if = "Option::is_none")]
555 llm_parallelism: Option<u32>,
556}
557
558#[derive(Debug, Serialize)]
559struct ItemEvent<'a> {
560 item: &'a str,
562 status: &'a str,
563 #[serde(skip_serializing_if = "Option::is_none")]
564 memory_id: Option<i64>,
565 #[serde(skip_serializing_if = "Option::is_none")]
566 entity_id: Option<i64>,
567 #[serde(skip_serializing_if = "Option::is_none")]
568 entities: Option<usize>,
569 #[serde(skip_serializing_if = "Option::is_none")]
570 rels: Option<usize>,
571 #[serde(skip_serializing_if = "Option::is_none")]
572 chars_before: Option<usize>,
573 #[serde(skip_serializing_if = "Option::is_none")]
574 chars_after: Option<usize>,
575 #[serde(skip_serializing_if = "Option::is_none")]
576 cost_usd: Option<f64>,
577 #[serde(skip_serializing_if = "Option::is_none")]
578 elapsed_ms: Option<u64>,
579 #[serde(skip_serializing_if = "Option::is_none")]
580 error: Option<String>,
581 index: usize,
582 total: usize,
583}
584
585#[derive(Debug, Serialize)]
586struct EnrichSummary {
587 summary: bool,
588 operation: String,
589 items_total: usize,
590 completed: usize,
591 failed: usize,
592 skipped: usize,
593 cost_usd: f64,
594 elapsed_ms: u64,
595}
596
597use crate::output::emit_json_line as emit_json;
598
599fn open_queue_db(path: &str) -> Result<Connection, AppError> {
614 let conn = Connection::open(path)?;
615 conn.pragma_update(None, "journal_mode", "wal")?;
616 conn.execute_batch(
617 "CREATE TABLE IF NOT EXISTS queue (
618 id INTEGER PRIMARY KEY AUTOINCREMENT,
619 item_key TEXT NOT NULL UNIQUE,
620 item_type TEXT NOT NULL DEFAULT 'memory',
621 status TEXT NOT NULL DEFAULT 'pending',
622 memory_id INTEGER,
623 entity_id INTEGER,
624 entities INTEGER DEFAULT 0,
625 rels INTEGER DEFAULT 0,
626 error TEXT,
627 cost_usd REAL DEFAULT 0.0,
628 attempt INTEGER DEFAULT 0,
629 elapsed_ms INTEGER,
630 created_at TEXT DEFAULT (datetime('now')),
631 done_at TEXT
632 );
633 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
634 )?;
635 Ok(conn)
636}
637
638fn call_claude(
646 binary: &Path,
647 prompt: &str,
648 json_schema: &str,
649 input_text: &str,
650 model: Option<&str>,
651 timeout_secs: u64,
652) -> Result<(serde_json::Value, f64, bool), AppError> {
653 let result = crate::commands::claude_runner::run_claude(
654 binary,
655 prompt,
656 json_schema,
657 input_text,
658 model,
659 timeout_secs,
660 7,
661 )?;
662 Ok((result.value, result.cost_usd, result.is_oauth))
663}
664
665enum PreflightOutcome {
671 Healthy,
673 RateLimited {
677 reason: String,
678 suggestion: &'static str,
679 },
680 Error(AppError),
682}
683
684fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
692 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
693
694 match args.mode {
695 EnrichMode::ClaudeCode => {
696 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
697 Ok(b) => b,
698 Err(e) => return PreflightOutcome::Error(e),
699 };
700 let mut cmd = std::process::Command::new(&bin);
701 cmd.env_clear();
702 for var in &["PATH", "HOME", "USER"] {
703 if let Ok(val) = std::env::var(var) {
704 cmd.env(var, val);
705 }
706 }
707 cmd.arg("-p")
708 .arg("ping")
709 .arg("--max-turns")
710 .arg("1")
711 .arg("--strict-mcp-config")
712 .arg("--mcp-config")
713 .arg("{}")
714 .arg("--dangerously-skip-permissions")
715 .arg("--settings")
716 .arg("{\"hooks\":{}}")
717 .arg("--output-format")
718 .arg("json")
719 .stdin(std::process::Stdio::null())
720 .stdout(std::process::Stdio::piped())
721 .stderr(std::process::Stdio::piped());
722
723 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
724 Ok(c) => c,
725 Err(e) => {
726 return PreflightOutcome::Error(AppError::Io(e));
727 }
728 };
729 let output = match wait_with_timeout(child, timeout) {
730 Ok(out) => out,
731 Err(e) => return PreflightOutcome::Error(e),
732 };
733 if !output.status.success() {
734 let stderr = String::from_utf8_lossy(&output.stderr);
735 if stderr.contains("hit your session limit")
736 || stderr.contains("rate_limit")
737 || stderr.contains("429")
738 {
739 return PreflightOutcome::RateLimited {
740 reason: stderr.trim().to_string(),
741 suggestion:
742 "wait for the OAuth window to reset or use --fallback-mode codex",
743 };
744 }
745 return PreflightOutcome::Error(AppError::Validation(format!(
746 "preflight probe failed: {stderr}",
747 stderr = stderr.trim()
748 )));
749 }
750 PreflightOutcome::Healthy
751 }
752 EnrichMode::Codex => {
753 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
754 Ok(b) => b,
755 Err(e) => return PreflightOutcome::Error(e),
756 };
757 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
758 .map_err(PreflightOutcome::Error)
759 .ok();
760 let schema = "{}";
761 let schema_path = match super::codex_spawn::trusted_schema_path() {
762 Ok(p) => p,
763 Err(e) => return PreflightOutcome::Error(e),
764 };
765 let spawn_args = super::codex_spawn::CodexSpawnArgs {
766 binary: &bin,
767 prompt: "ping",
768 json_schema: schema,
769 input_text: "",
770 model: args.codex_model.as_deref(),
771 timeout_secs: args.rate_limit_buffer.max(60),
772 schema_path: schema_path.clone(),
773 };
774 let mut cmd = super::codex_spawn::build_codex_command(&spawn_args);
775 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
776 Ok(c) => c,
777 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
778 };
779 let output = match wait_with_timeout(child, timeout) {
780 Ok(out) => out,
781 Err(e) => return PreflightOutcome::Error(e),
782 };
783 let _ = std::fs::remove_file(&schema_path);
784 if !output.status.success() {
785 let stderr = String::from_utf8_lossy(&output.stderr);
786 if stderr.contains("rate_limit")
787 || stderr.contains("429")
788 || stderr.contains("Too Many Requests")
789 {
790 return PreflightOutcome::RateLimited {
791 reason: stderr.trim().to_string(),
792 suggestion: "wait for the rate-limit window to reset",
793 };
794 }
795 return PreflightOutcome::Error(AppError::Validation(format!(
796 "preflight probe failed: {stderr}",
797 stderr = stderr.trim()
798 )));
799 }
800 PreflightOutcome::Healthy
801 }
802 }
803}
804
805fn wait_with_timeout(
807 mut child: std::process::Child,
808 timeout: std::time::Duration,
809) -> Result<std::process::Output, AppError> {
810 use wait_timeout::ChildExt;
811 let start = std::time::Instant::now();
812 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
813 if status.is_none() {
814 let _ = child.kill();
815 let _ = child.wait();
816 return Err(AppError::Validation(format!(
817 "preflight probe timed out after {}s",
818 start.elapsed().as_secs()
819 )));
820 }
821 let mut stdout = Vec::new();
822 if let Some(mut out) = child.stdout.take() {
823 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
824 }
825 let mut stderr = Vec::new();
826 if let Some(mut err) = child.stderr.take() {
827 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
828 }
829 let exit = status.unwrap();
830 Ok(std::process::Output {
831 status: exit,
832 stdout,
833 stderr,
834 })
835}
836
837fn scan_unbound_memories(
848 conn: &Connection,
849 namespace: &str,
850 limit: Option<usize>,
851 name_filter: &[String],
852) -> Result<Vec<(i64, String, String)>, AppError> {
853 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
854
855 if name_filter.is_empty() {
856 let sql = format!(
857 "SELECT m.id, m.name, m.body
858 FROM memories m
859 WHERE m.namespace = ?1
860 AND m.deleted_at IS NULL
861 AND NOT EXISTS (
862 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
863 )
864 ORDER BY m.id
865 {limit_clause}"
866 );
867 let mut stmt = conn.prepare(&sql)?;
868 let rows = stmt
869 .query_map(rusqlite::params![namespace], |r| {
870 Ok((
871 r.get::<_, i64>(0)?,
872 r.get::<_, String>(1)?,
873 r.get::<_, String>(2)?,
874 ))
875 })?
876 .collect::<Result<Vec<_>, _>>()?;
877 Ok(rows)
878 } else {
879 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
881 .map(|i| format!("?{i}"))
882 .collect();
883 let in_clause = placeholders.join(", ");
884 let sql = format!(
885 "SELECT m.id, m.name, m.body
886 FROM memories m
887 WHERE m.namespace = ?1
888 AND m.deleted_at IS NULL
889 AND m.name IN ({in_clause})
890 AND NOT EXISTS (
891 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
892 )
893 ORDER BY m.id
894 {limit_clause}"
895 );
896 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
897 params_vec.push(&namespace);
898 for n in name_filter {
899 params_vec.push(n);
900 }
901 let mut stmt = conn.prepare(&sql)?;
902 let rows = stmt
903 .query_map(
904 rusqlite::params_from_iter(params_vec.iter().copied()),
905 |r| {
906 Ok((
907 r.get::<_, i64>(0)?,
908 r.get::<_, String>(1)?,
909 r.get::<_, String>(2)?,
910 ))
911 },
912 )?
913 .collect::<Result<Vec<_>, _>>()?;
914 Ok(rows)
915 }
916}
917
918fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
923 let content = std::fs::read_to_string(path).map_err(|e| {
924 AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
925 })?;
926 let mut seen = std::collections::HashSet::new();
927 let mut out = Vec::new();
928 for line in content.lines() {
929 let trimmed = line.trim();
930 if trimmed.is_empty() || trimmed.starts_with('#') {
931 continue;
932 }
933 if seen.insert(trimmed.to_string()) {
934 out.push(trimmed.to_string());
935 }
936 }
937 Ok(out)
938}
939
940fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
942 let mut combined: Vec<String> = args.names.clone();
943 if let Some(p) = &args.names_file {
944 let from_file = read_names_file(p)?;
945 for n in from_file {
946 if !combined.contains(&n) {
947 combined.push(n);
948 }
949 }
950 }
951 Ok(combined)
952}
953
954fn scan_entities_without_description(
958 conn: &Connection,
959 namespace: &str,
960 limit: Option<usize>,
961) -> Result<Vec<(i64, String, String)>, AppError> {
962 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
963 let sql = format!(
964 "SELECT id, name, type
965 FROM entities
966 WHERE namespace = ?1
967 AND (description IS NULL OR description = '')
968 ORDER BY id
969 {limit_clause}"
970 );
971 let mut stmt = conn.prepare(&sql)?;
972 let rows = stmt
973 .query_map(rusqlite::params![namespace], |r| {
974 Ok((
975 r.get::<_, i64>(0)?,
976 r.get::<_, String>(1)?,
977 r.get::<_, String>(2)?,
978 ))
979 })?
980 .collect::<Result<Vec<_>, _>>()?;
981 Ok(rows)
982}
983
984fn scan_short_body_memories(
988 conn: &Connection,
989 namespace: &str,
990 min_chars: usize,
991 limit: Option<usize>,
992) -> Result<Vec<(i64, String, String)>, AppError> {
993 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
994 let sql = format!(
995 "SELECT m.id, m.name, m.body
996 FROM memories m
997 WHERE m.namespace = ?1
998 AND m.deleted_at IS NULL
999 AND LENGTH(COALESCE(m.body,'')) < ?2
1000 ORDER BY m.id
1001 {limit_clause}"
1002 );
1003 let mut stmt = conn.prepare(&sql)?;
1004 let rows = stmt
1005 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1006 Ok((
1007 r.get::<_, i64>(0)?,
1008 r.get::<_, String>(1)?,
1009 r.get::<_, String>(2)?,
1010 ))
1011 })?
1012 .collect::<Result<Vec<_>, _>>()?;
1013 Ok(rows)
1014}
1015
1016fn scan_memories_without_embeddings(
1020 conn: &Connection,
1021 namespace: &str,
1022 limit: Option<usize>,
1023 name_filter: &[String],
1024) -> Result<Vec<(i64, String, String)>, AppError> {
1025 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1026
1027 if name_filter.is_empty() {
1028 let sql = format!(
1029 "SELECT m.id, m.name, COALESCE(m.body,'')
1030 FROM memories m
1031 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1032 WHERE m.namespace = ?1
1033 AND m.deleted_at IS NULL
1034 AND me.memory_id IS NULL
1035 ORDER BY m.id
1036 {limit_clause}"
1037 );
1038 let mut stmt = conn.prepare(&sql)?;
1039 let rows = stmt
1040 .query_map(rusqlite::params![namespace], |r| {
1041 Ok((
1042 r.get::<_, i64>(0)?,
1043 r.get::<_, String>(1)?,
1044 r.get::<_, String>(2)?,
1045 ))
1046 })?
1047 .collect::<Result<Vec<_>, _>>()?;
1048 Ok(rows)
1049 } else {
1050 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1051 .map(|i| format!("?{i}"))
1052 .collect();
1053 let in_clause = placeholders.join(", ");
1054 let sql = format!(
1055 "SELECT m.id, m.name, COALESCE(m.body,'')
1056 FROM memories m
1057 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1058 WHERE m.namespace = ?1
1059 AND m.deleted_at IS NULL
1060 AND m.name IN ({in_clause})
1061 AND me.memory_id IS NULL
1062 ORDER BY m.id
1063 {limit_clause}"
1064 );
1065 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1066 params_vec.push(&namespace);
1067 for n in name_filter {
1068 params_vec.push(n);
1069 }
1070 let mut stmt = conn.prepare(&sql)?;
1071 let rows = stmt
1072 .query_map(
1073 rusqlite::params_from_iter(params_vec.iter().copied()),
1074 |r| {
1075 Ok((
1076 r.get::<_, i64>(0)?,
1077 r.get::<_, String>(1)?,
1078 r.get::<_, String>(2)?,
1079 ))
1080 },
1081 )?
1082 .collect::<Result<Vec<_>, _>>()?;
1083 Ok(rows)
1084 }
1085}
1086
1087#[allow(clippy::type_complexity)]
1089fn scan_weight_candidates(
1090 conn: &Connection,
1091 namespace: &str,
1092 limit: Option<usize>,
1093) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1094 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1095 let sql = format!(
1096 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1097 FROM relationships r \
1098 JOIN entities e1 ON e1.id = r.source_id \
1099 JOIN entities e2 ON e2.id = r.target_id \
1100 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1101 ORDER BY r.weight DESC {limit_clause}"
1102 );
1103 let mut stmt = conn.prepare(&sql)?;
1104 let rows = stmt
1105 .query_map(rusqlite::params![namespace], |r| {
1106 Ok((
1107 r.get::<_, i64>(0)?,
1108 r.get::<_, String>(1)?,
1109 r.get::<_, String>(2)?,
1110 r.get::<_, String>(3)?,
1111 r.get::<_, f64>(4)?,
1112 ))
1113 })?
1114 .collect::<Result<Vec<_>, _>>()?;
1115 Ok(rows)
1116}
1117
1118fn scan_generic_relations(
1120 conn: &Connection,
1121 namespace: &str,
1122 limit: Option<usize>,
1123) -> Result<Vec<(i64, String, String, String)>, AppError> {
1124 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1125 let sql = format!(
1126 "SELECT r.id, e1.name, e2.name, r.relation \
1127 FROM relationships r \
1128 JOIN entities e1 ON e1.id = r.source_id \
1129 JOIN entities e2 ON e2.id = r.target_id \
1130 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1131 ORDER BY r.id {limit_clause}"
1132 );
1133 let mut stmt = conn.prepare(&sql)?;
1134 let rows = stmt
1135 .query_map(rusqlite::params![namespace], |r| {
1136 Ok((
1137 r.get::<_, i64>(0)?,
1138 r.get::<_, String>(1)?,
1139 r.get::<_, String>(2)?,
1140 r.get::<_, String>(3)?,
1141 ))
1142 })?
1143 .collect::<Result<Vec<_>, _>>()?;
1144 Ok(rows)
1145}
1146
1147fn persist_memory_bindings(
1156 conn: &Connection,
1157 namespace: &str,
1158 memory_id: i64,
1159 entities_json: &serde_json::Value,
1160 rels_json: &serde_json::Value,
1161) -> Result<(usize, usize), AppError> {
1162 #[derive(Deserialize)]
1163 struct EntityItem {
1164 name: String,
1165 entity_type: String,
1166 }
1167 #[derive(Deserialize)]
1168 struct RelItem {
1169 source: String,
1170 target: String,
1171 relation: String,
1172 strength: f64,
1173 }
1174
1175 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1176 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1177
1178 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1179 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1180
1181 let mut ent_count = 0usize;
1182 let mut rel_count = 0usize;
1183
1184 for item in &extracted_entities {
1185 let entity_type = match item.entity_type.parse::<EntityType>() {
1186 Ok(et) => et,
1187 Err(_) => {
1188 tracing::warn!(
1189 target: "enrich",
1190 entity = %item.name,
1191 entity_type = %item.entity_type,
1192 "entity type not recognized, skipping"
1193 );
1194 continue;
1195 }
1196 };
1197 match entities::upsert_entity(
1198 conn,
1199 namespace,
1200 &NewEntity {
1201 name: item.name.clone(),
1202 entity_type,
1203 description: None,
1204 },
1205 ) {
1206 Ok(eid) => {
1207 let _ = entities::link_memory_entity(conn, memory_id, eid);
1208 ent_count += 1;
1209 }
1210 Err(e) => {
1211 tracing::warn!(
1212 target: "enrich",
1213 entity = %item.name,
1214 error = %e,
1215 "entity upsert skipped"
1216 );
1217 }
1218 }
1219 }
1220
1221 for rel in &extracted_rels {
1222 let normalized = crate::parsers::normalize_relation(&rel.relation);
1223 crate::parsers::warn_if_non_canonical(&normalized);
1224
1225 let src_name = crate::parsers::normalize_entity_name(&rel.source);
1228 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1229 let src_id = entities::find_entity_id(conn, namespace, &src_name);
1230 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1231 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1232 let new_rel = NewRelationship {
1233 source: rel.source.clone(),
1234 target: rel.target.clone(),
1235 relation: normalized,
1236 strength: rel.strength,
1237 description: None,
1238 };
1239 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1240 rel_count += 1;
1241 }
1242 }
1243 }
1244
1245 Ok((ent_count, rel_count))
1246}
1247
1248fn persist_entity_description(
1250 conn: &Connection,
1251 entity_id: i64,
1252 description: &str,
1253) -> Result<(), AppError> {
1254 conn.execute(
1255 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1256 rusqlite::params![description, entity_id],
1257 )?;
1258 Ok(())
1259}
1260
1261fn reembed_memory_vector(
1262 conn: &Connection,
1263 namespace: &str,
1264 memory_id: i64,
1265 memory_name: &str,
1266 memory_type: &str,
1267 body: &str,
1268 paths: &crate::paths::AppPaths,
1269) -> Result<(), AppError> {
1270 let snippet: String = body.chars().take(200).collect();
1271 let embedding = crate::embedder::embed_passage_local(&paths.models, body)?;
1272 memories::upsert_vec(
1273 conn,
1274 memory_id,
1275 namespace,
1276 memory_type,
1277 &embedding,
1278 memory_name,
1279 &snippet,
1280 )?;
1281 Ok(())
1282}
1283
1284fn persist_enriched_body(
1289 conn: &Connection,
1290 namespace: &str,
1291 memory_id: i64,
1292 memory_name: &str,
1293 new_body: &str,
1294 paths: &crate::paths::AppPaths,
1295) -> Result<(), AppError> {
1296 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1298 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1299 rusqlite::params![memory_id],
1300 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1301 )?;
1302
1303 let memory_type: String = conn.query_row(
1304 "SELECT type FROM memories WHERE id=?1",
1305 rusqlite::params![memory_id],
1306 |r| r.get(0),
1307 )?;
1308
1309 let description: String = conn.query_row(
1310 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1311 rusqlite::params![memory_id],
1312 |r| r.get(0),
1313 )?;
1314
1315 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1316
1317 let new_memory = memories::NewMemory {
1318 namespace: namespace.to_string(),
1319 name: memory_name.to_string(),
1320 memory_type: memory_type.clone(),
1321 description: description.clone(),
1322 body: new_body.to_string(),
1323 body_hash,
1324 session_id: None,
1325 source: "agent".to_string(),
1326 metadata: serde_json::json!({
1327 "operation": "body-enrich",
1328 "orig_chars": old_body.chars().count(),
1329 "new_chars": new_body.chars().count(),
1330 }),
1331 };
1332
1333 let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1337 let version_metadata = serde_json::json!({
1338 "operation": "body-enrich",
1339 "orig_chars": old_body.chars().count(),
1340 "new_chars": new_body.chars().count(),
1341 })
1342 .to_string();
1343 crate::storage::versions::insert_version(
1344 conn,
1345 memory_id,
1346 next_version,
1347 memory_name,
1348 &memory_type,
1349 &description,
1350 new_body,
1351 &version_metadata,
1352 Some("enrich"),
1353 "edit",
1354 )?;
1355
1356 memories::update(conn, memory_id, &new_memory, None)?;
1357 memories::sync_fts_after_update(
1358 conn,
1359 memory_id,
1360 &old_name,
1361 &old_desc,
1362 &old_body,
1363 &new_memory.name,
1364 &new_memory.description,
1365 &new_memory.body,
1366 )?;
1367
1368 if let Err(e) = reembed_memory_vector(
1370 conn,
1371 namespace,
1372 memory_id,
1373 memory_name,
1374 &memory_type,
1375 new_body,
1376 paths,
1377 ) {
1378 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1379 }
1380
1381 Ok(())
1382}
1383
1384fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1396 value == default
1397}
1398
1399fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1414 const DEFAULT_TIMEOUT: u64 = 300;
1415
1416 let mut conflicts: Vec<String> = Vec::new();
1417
1418 match args.mode {
1419 EnrichMode::ClaudeCode => {
1420 if args.codex_binary.is_some() {
1421 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1422 }
1423 if args.codex_model.is_some() {
1424 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1425 }
1426 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1427 conflicts.push(format!(
1428 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1429 args.codex_timeout
1430 ));
1431 }
1432 }
1433 EnrichMode::Codex => {
1434 if args.claude_binary.is_some() {
1435 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1436 }
1437 if args.claude_model.is_some() {
1438 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1439 }
1440 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1441 conflicts.push(format!(
1442 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1443 args.claude_timeout
1444 ));
1445 }
1446 if args.max_cost_usd.is_some() {
1447 conflicts.push(
1448 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1449 .to_string(),
1450 );
1451 }
1452 }
1453 }
1454
1455 if !conflicts.is_empty() {
1456 return Err(AppError::Validation(format!(
1457 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1458 args.mode,
1459 conflicts.join("\n - ")
1460 )));
1461 }
1462
1463 Ok(())
1464}
1465
1466pub fn run(args: &EnrichArgs) -> Result<(), AppError> {
1470 validate_mode_conditional_flags_enrich(args)?;
1473 let started = Instant::now();
1474
1475 let paths = AppPaths::resolve(args.db.as_deref())?;
1476 ensure_db_ready(&paths)?;
1477 let conn = open_rw(&paths.db)?;
1478 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1479
1480 let wait_secs = args.wait_job_singleton;
1486 let force_flag = args.force_job_singleton;
1487 let _singleton = crate::lock::acquire_job_singleton(
1488 crate::lock::JobType::Enrich,
1489 &namespace,
1490 &paths.db,
1491 wait_secs,
1492 force_flag,
1493 )?;
1494
1495 let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1497 None
1498 } else {
1499 Some(match args.mode {
1500 EnrichMode::ClaudeCode => {
1501 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1502 let version = super::claude_runner::validate_claude_version(&bin)?;
1503 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1504 emit_json(&PhaseEvent {
1505 phase: "validate",
1506 binary_path: bin.to_str(),
1507 version: Some(&version),
1508 items_total: None,
1509 items_pending: None,
1510 llm_parallelism: None,
1511 });
1512 bin
1513 }
1514 EnrichMode::Codex => {
1515 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1516 emit_json(&PhaseEvent {
1517 phase: "validate",
1518 binary_path: bin.to_str(),
1519 version: None,
1520 items_total: None,
1521 items_pending: None,
1522 llm_parallelism: None,
1523 });
1524 bin
1525 }
1526 })
1527 };
1528
1529 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1533 let load = crate::system_load::load_average_one();
1534 let n = crate::system_load::ncpus();
1535 return Err(AppError::Validation(format!(
1536 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1537 pass --no-max-load-check to override (not recommended)"
1538 )));
1539 }
1540
1541 if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1548 {
1549 let preflight_result = run_preflight_probe(args);
1550 match preflight_result {
1551 PreflightOutcome::Healthy => {
1552 tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1553 }
1554 PreflightOutcome::RateLimited { reason, suggestion } => {
1555 if let Some(fallback) = args.fallback_mode.clone() {
1556 if fallback != args.mode {
1557 return Err(AppError::Validation(format!(
1567 "preflight detected rate limit on {mode:?}: {reason}; \
1568 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1569 mode = args.mode
1570 )));
1571 }
1572 return Err(AppError::Validation(format!(
1573 "preflight detected rate limit on {mode:?}: {reason}; \
1574 --fallback-mode matches --mode, no recovery possible",
1575 mode = args.mode
1576 )));
1577 }
1578 return Err(AppError::Validation(format!(
1579 "preflight detected rate limit on {mode:?}: {reason}; \
1580 {suggestion}; pass --fallback-mode codex to recover",
1581 mode = args.mode
1582 )));
1583 }
1584 PreflightOutcome::Error(e) => {
1585 return Err(e);
1586 }
1587 }
1588 }
1589
1590 let scan_result = scan_operation(&conn, &namespace, args)?;
1592 let total = scan_result.len();
1593
1594 emit_json(&PhaseEvent {
1595 phase: "scan",
1596 binary_path: None,
1597 version: None,
1598 items_total: Some(total),
1599 items_pending: Some(total),
1600 llm_parallelism: Some(args.llm_parallelism),
1601 });
1602
1603 if args.dry_run {
1605 for (idx, key) in scan_result.iter().enumerate() {
1606 emit_json(&ItemEvent {
1607 item: key,
1608 status: "preview",
1609 memory_id: None,
1610 entity_id: None,
1611 entities: None,
1612 rels: None,
1613 chars_before: None,
1614 chars_after: None,
1615 cost_usd: None,
1616 elapsed_ms: None,
1617 error: None,
1618 index: idx,
1619 total,
1620 });
1621 }
1622 emit_json(&EnrichSummary {
1623 summary: true,
1624 operation: format!("{:?}", args.operation),
1625 items_total: total,
1626 completed: 0,
1627 failed: 0,
1628 skipped: 0,
1629 cost_usd: 0.0,
1630 elapsed_ms: started.elapsed().as_millis() as u64,
1631 });
1632 return Ok(());
1633 }
1634
1635 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1639
1640 if args.resume {
1641 let reset = queue_conn
1642 .execute(
1643 "UPDATE queue SET status='pending' WHERE status='processing'",
1644 [],
1645 )
1646 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1647 if reset > 0 {
1648 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1649 }
1650 }
1651
1652 if args.retry_failed {
1653 let count = queue_conn
1654 .execute(
1655 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1656 [],
1657 )
1658 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1659 tracing::info!(target: "enrich", count, "retrying failed items");
1660 }
1661
1662 if !args.resume && !args.retry_failed {
1663 queue_conn
1664 .execute("DELETE FROM queue", [])
1665 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1666 }
1667
1668 for (idx, key) in scan_result.iter().enumerate() {
1670 let item_type = match args.operation {
1671 EnrichOperation::EntityDescriptions => "entity",
1672 _ => "memory",
1673 };
1674 if let Err(e) = queue_conn.execute(
1675 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1676 rusqlite::params![key, item_type],
1677 ) {
1678 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1679 }
1680 let _ = idx; }
1682
1683 let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1686 if parallelism > 1 {
1687 tracing::info!(
1688 target: "enrich",
1689 llm_parallelism = parallelism,
1690 "parallel LLM processing with bounded thread pool"
1691 );
1692 }
1693 if parallelism > 4 {
1697 match args.mode {
1698 EnrichMode::ClaudeCode => {
1699 tracing::warn!(
1700 target: "enrich",
1701 llm_parallelism = parallelism,
1702 recommended_max = 4,
1703 mode = "claude-code",
1704 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1705 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1706 to cut MCP children (G28-A)"
1707 );
1708 }
1709 EnrichMode::Codex if parallelism > 16 => {
1710 tracing::warn!(
1711 target: "enrich",
1712 llm_parallelism = parallelism,
1713 recommended_max = 16,
1714 mode = "codex",
1715 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1716 consider --llm-parallelism 8 for safer concurrency"
1717 );
1718 }
1719 EnrichMode::Codex => {
1720 }
1724 }
1725 }
1726
1727 let mut completed = 0usize;
1728 let mut failed = 0usize;
1729 let mut skipped = 0usize;
1730 let mut cost_total = 0.0f64;
1731 let mut oauth_detected = false;
1732 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1733 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1734 let enrich_started = std::time::Instant::now();
1735
1736 let provider_timeout = match args.mode {
1737 EnrichMode::ClaudeCode => args.claude_timeout,
1738 EnrichMode::Codex => args.codex_timeout,
1739 };
1740
1741 let provider_model: Option<&str> = match args.mode {
1742 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1743 EnrichMode::Codex => args.codex_model.as_deref(),
1744 };
1745
1746 if parallelism > 1 {
1750 let stdout_mu = parking_lot::Mutex::new(());
1751 let budget = args.max_cost_usd;
1752 let operation = args.operation.clone();
1753 let mode = args.mode.clone();
1754 let min_oc = args.min_output_chars;
1755 let max_oc = args.max_output_chars;
1756 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1757
1758 struct WorkerResult {
1759 completed: usize,
1760 failed: usize,
1761 skipped: usize,
1762 cost: f64,
1763 oauth: bool,
1764 }
1765
1766 let results: Vec<WorkerResult> = std::thread::scope(|s| {
1767 let handles: Vec<_> = (0..parallelism)
1768 .map(|worker_id| {
1769 let stdout_mu = &stdout_mu;
1770 let paths = &paths;
1771 let namespace = &namespace;
1772 let provider_binary = provider_binary.as_deref();
1773 let operation = &operation;
1774 let mode = &mode;
1775 let prompt_tpl = prompt_tpl.as_deref();
1776 s.spawn(move || {
1777 let w_conn = match open_rw(&paths.db) {
1778 Ok(c) => c,
1779 Err(e) => {
1780 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1781 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1782 }
1783 };
1784 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1785 Ok(c) => c,
1786 Err(e) => {
1787 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1788 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1789 }
1790 };
1791 let mut w_completed = 0usize;
1792 let mut w_failed = 0usize;
1793 let mut w_skipped = 0usize;
1794 let mut w_cost = 0.0f64;
1795 let mut w_oauth = false;
1796 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1797 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1798 let mut w_breaker = crate::retry::CircuitBreaker::new(
1804 args.circuit_breaker_threshold.max(1),
1805 std::time::Duration::from_secs(60),
1806 );
1807
1808 loop {
1809 if crate::shutdown_requested() {
1810 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1811 break;
1812 }
1813 if let Some(b) = budget {
1814 if !w_oauth && w_cost >= b {
1815 break;
1816 }
1817 }
1818 let pending: Option<(i64, String, String)> = w_queue
1819 .query_row(
1820 "UPDATE queue SET status='processing', attempt=attempt+1 \
1821 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1822 RETURNING id, item_key, item_type",
1823 [],
1824 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1825 )
1826 .ok();
1827 let (queue_id, item_key, _item_type) = match pending {
1828 Some(p) => p,
1829 None => break,
1830 };
1831 let item_started = Instant::now();
1832 let current_index = w_completed + w_failed + w_skipped;
1833
1834 let call_result = match operation {
1835 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1836 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1837 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths),
1838 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths),
1839 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1840 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1841 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1842 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1843 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1844 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1845 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1846 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1847 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1848 };
1849
1850 match call_result {
1851 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1852 if is_oauth { w_oauth = true; }
1853 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1854 let _ = w_queue.execute(
1855 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1856 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1857 );
1858 w_completed += 1;
1859 if !is_oauth { w_cost += cost; }
1860 let _ = w_breaker
1862 .record(crate::retry::AttemptOutcome::Success);
1863 let _guard = stdout_mu.lock();
1864 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1865 }
1866 Ok(EnrichItemResult::Skipped { reason }) => {
1867 w_skipped += 1;
1868 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1869 let _guard = stdout_mu.lock();
1870 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1871 }
1872 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
1873 w_skipped += 1;
1879 let reason = format!(
1880 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
1881 );
1882 let _ = w_queue.execute(
1883 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1884 rusqlite::params![reason, queue_id],
1885 );
1886 let _guard = stdout_mu.lock();
1887 emit_json(&ItemEvent {
1888 item: &item_key,
1889 status: "preservation_failed",
1890 memory_id: None,
1891 entity_id: None,
1892 entities: None,
1893 rels: None,
1894 chars_before: Some(chars_before),
1895 chars_after: Some(chars_after),
1896 cost_usd: None,
1897 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1898 error: Some(reason),
1899 index: current_index,
1900 total,
1901 });
1902 }
1903 Err(e) => {
1904 let err_str = format!("{e}");
1905 if matches!(e, AppError::RateLimited { .. }) {
1906 if crate::retry::is_kill_switch_active() {
1907 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1908 } else if std::time::Instant::now() >= w_deadline {
1909 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1910 } else {
1911 let half = w_backoff / 2;
1912 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1913 let actual_wait = half + jitter;
1914 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1915 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1916 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1917 w_backoff = (w_backoff * 2).min(900);
1918 continue;
1919 }
1920 }
1921 w_failed += 1;
1922 let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1923 let _guard = stdout_mu.lock();
1924 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1925 let breaker_opened = w_breaker
1927 .record(crate::retry::AttemptOutcome::HardFailure);
1928 if breaker_opened {
1929 tracing::error!(target: "enrich",
1930 consecutive_failures = w_breaker.consecutive_failures(),
1931 "circuit breaker opened — aborting worker"
1932 );
1933 break;
1934 }
1935 }
1936 }
1937 }
1938 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1939 })
1940 })
1941 .collect();
1942 handles
1943 .into_iter()
1944 .map(|h| {
1945 h.join().unwrap_or(WorkerResult {
1946 completed: 0,
1947 failed: 0,
1948 skipped: 0,
1949 cost: 0.0,
1950 oauth: false,
1951 })
1952 })
1953 .collect()
1954 });
1955
1956 for r in &results {
1957 completed += r.completed;
1958 failed += r.failed;
1959 skipped += r.skipped;
1960 cost_total += r.cost;
1961 if r.oauth && !oauth_detected {
1962 oauth_detected = true;
1963 }
1964 }
1965 } else {
1966 loop {
1968 if crate::shutdown_requested() {
1969 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
1970 break;
1971 }
1972
1973 if let Some(budget) = args.max_cost_usd {
1975 if !oauth_detected && cost_total >= budget {
1976 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
1977 break;
1978 }
1979 }
1980
1981 let pending: Option<(i64, String, String)> = queue_conn
1983 .query_row(
1984 "UPDATE queue SET status='processing', attempt=attempt+1 \
1985 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1986 RETURNING id, item_key, item_type",
1987 [],
1988 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1989 )
1990 .ok();
1991
1992 let (queue_id, item_key, item_type) = match pending {
1993 Some(p) => p,
1994 None => break,
1995 };
1996
1997 let item_started = Instant::now();
1998 let current_index = completed + failed + skipped;
1999
2000 let call_result = match args.operation {
2001 EnrichOperation::MemoryBindings => call_memory_bindings(
2002 &conn,
2003 &namespace,
2004 &item_key,
2005 provider_binary
2006 .as_deref()
2007 .expect("provider binary required"),
2008 provider_model,
2009 provider_timeout,
2010 &args.mode,
2011 ),
2012 EnrichOperation::EntityDescriptions => call_entity_description(
2013 &conn,
2014 &namespace,
2015 &item_key,
2016 provider_binary
2017 .as_deref()
2018 .expect("provider binary required"),
2019 provider_model,
2020 provider_timeout,
2021 &args.mode,
2022 ),
2023 EnrichOperation::BodyEnrich => call_body_enrich(
2024 &conn,
2025 &namespace,
2026 &item_key,
2027 provider_binary
2028 .as_deref()
2029 .expect("provider binary required"),
2030 provider_model,
2031 provider_timeout,
2032 &args.mode,
2033 args.min_output_chars,
2034 args.max_output_chars,
2035 args.prompt_template.as_deref(),
2036 args.preserve_threshold,
2037 &paths,
2038 ),
2039 EnrichOperation::ReEmbed => call_reembed(&conn, &namespace, &item_key, &paths),
2040 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2041 &conn,
2042 &namespace,
2043 &item_key,
2044 provider_binary
2045 .as_deref()
2046 .expect("provider binary required"),
2047 provider_model,
2048 provider_timeout,
2049 &args.mode,
2050 ),
2051 EnrichOperation::RelationReclassify => call_relation_reclassify(
2052 &conn,
2053 &namespace,
2054 &item_key,
2055 provider_binary
2056 .as_deref()
2057 .expect("provider binary required"),
2058 provider_model,
2059 provider_timeout,
2060 &args.mode,
2061 ),
2062 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2063 call_entity_connect(
2064 &conn,
2065 &namespace,
2066 &item_key,
2067 provider_binary
2068 .as_deref()
2069 .expect("provider binary required"),
2070 provider_model,
2071 provider_timeout,
2072 &args.mode,
2073 )
2074 }
2075 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2076 &conn,
2077 &namespace,
2078 &item_key,
2079 provider_binary
2080 .as_deref()
2081 .expect("provider binary required"),
2082 provider_model,
2083 provider_timeout,
2084 &args.mode,
2085 ),
2086 EnrichOperation::DescriptionEnrich => call_description_enrich(
2087 &conn,
2088 &namespace,
2089 &item_key,
2090 provider_binary
2091 .as_deref()
2092 .expect("provider binary required"),
2093 provider_model,
2094 provider_timeout,
2095 &args.mode,
2096 ),
2097 EnrichOperation::DomainClassify => call_domain_classify(
2098 &conn,
2099 &namespace,
2100 &item_key,
2101 provider_binary
2102 .as_deref()
2103 .expect("provider binary required"),
2104 provider_model,
2105 provider_timeout,
2106 &args.mode,
2107 ),
2108 EnrichOperation::GraphAudit => call_graph_audit(
2109 &conn,
2110 &namespace,
2111 &item_key,
2112 provider_binary
2113 .as_deref()
2114 .expect("provider binary required"),
2115 provider_model,
2116 provider_timeout,
2117 &args.mode,
2118 ),
2119 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2120 &conn,
2121 &namespace,
2122 &item_key,
2123 provider_binary
2124 .as_deref()
2125 .expect("provider binary required"),
2126 provider_model,
2127 provider_timeout,
2128 &args.mode,
2129 ),
2130 EnrichOperation::BodyExtract => call_body_extract(
2131 &conn,
2132 &namespace,
2133 &item_key,
2134 provider_binary
2135 .as_deref()
2136 .expect("provider binary required"),
2137 provider_model,
2138 provider_timeout,
2139 &args.mode,
2140 ),
2141 };
2142
2143 match call_result {
2144 Ok(EnrichItemResult::Done {
2145 memory_id,
2146 entity_id,
2147 entities,
2148 rels,
2149 chars_before,
2150 chars_after,
2151 cost,
2152 is_oauth,
2153 }) => {
2154 if is_oauth && !oauth_detected {
2155 oauth_detected = true;
2156 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2157 }
2158 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2159
2160 let persist_err: Option<String> = match args.operation {
2162 EnrichOperation::MemoryBindings => {
2163 None
2165 }
2166 EnrichOperation::EntityDescriptions => {
2167 None
2169 }
2170 EnrichOperation::BodyEnrich => {
2171 None
2173 }
2174 _ => {
2175 None
2177 }
2178 };
2179
2180 if let Err(e) = queue_conn.execute(
2181 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2182 rusqlite::params![
2183 memory_id,
2184 entity_id,
2185 entities as i64,
2186 rels as i64,
2187 cost,
2188 item_started.elapsed().as_millis() as i64,
2189 queue_id
2190 ],
2191 ) {
2192 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2193 }
2194
2195 if persist_err.is_none() {
2196 completed += 1;
2197 if !is_oauth {
2198 cost_total += cost;
2199 }
2200 emit_json(&ItemEvent {
2201 item: &item_key,
2202 status: "done",
2203 memory_id,
2204 entity_id,
2205 entities: Some(entities),
2206 rels: Some(rels),
2207 chars_before,
2208 chars_after,
2209 cost_usd: if is_oauth { None } else { Some(cost) },
2210 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2211 error: None,
2212 index: current_index,
2213 total,
2214 });
2215 } else {
2216 failed += 1;
2217 emit_json(&ItemEvent {
2218 item: &item_key,
2219 status: "failed",
2220 memory_id: None,
2221 entity_id: None,
2222 entities: None,
2223 rels: None,
2224 chars_before: None,
2225 chars_after: None,
2226 cost_usd: None,
2227 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2228 error: persist_err,
2229 index: current_index,
2230 total,
2231 });
2232 }
2233 }
2234 Ok(EnrichItemResult::Skipped { reason }) => {
2235 skipped += 1;
2236 if let Err(e) = queue_conn.execute(
2237 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2238 rusqlite::params![reason, queue_id],
2239 ) {
2240 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2241 }
2242 emit_json(&ItemEvent {
2243 item: &item_key,
2244 status: "skipped",
2245 memory_id: None,
2246 entity_id: None,
2247 entities: None,
2248 rels: None,
2249 chars_before: None,
2250 chars_after: None,
2251 cost_usd: None,
2252 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2253 error: None,
2254 index: current_index,
2255 total,
2256 });
2257 }
2258 Ok(EnrichItemResult::PreservationFailed {
2259 score,
2260 threshold,
2261 chars_before,
2262 chars_after,
2263 }) => {
2264 skipped += 1;
2271 let reason = format!(
2272 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2273 );
2274 if let Err(qe) = queue_conn.execute(
2275 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2276 rusqlite::params![reason, queue_id],
2277 ) {
2278 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2279 }
2280 emit_json(&ItemEvent {
2281 item: &item_key,
2282 status: "preservation_failed",
2283 memory_id: None,
2284 entity_id: None,
2285 entities: None,
2286 rels: None,
2287 chars_before: Some(chars_before),
2288 chars_after: Some(chars_after),
2289 cost_usd: None,
2290 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2291 error: Some(reason),
2292 index: current_index,
2293 total,
2294 });
2295 }
2296 Err(e) => {
2297 let err_str = format!("{e}");
2298 if matches!(e, AppError::RateLimited { .. }) {
2299 if crate::retry::is_kill_switch_active() {
2300 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2301 } else if std::time::Instant::now() >= rate_limit_deadline {
2302 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2303 } else {
2304 let half = backoff_secs / 2;
2305 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2306 let actual_wait = half + jitter;
2307 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2308 if let Err(qe) = queue_conn.execute(
2309 "UPDATE queue SET status='pending' WHERE id=?1",
2310 rusqlite::params![queue_id],
2311 ) {
2312 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2313 }
2314 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2315 backoff_secs = (backoff_secs * 2).min(900);
2316 continue;
2317 }
2318 }
2319
2320 failed += 1;
2321 if let Err(qe) = queue_conn.execute(
2322 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2323 rusqlite::params![err_str, queue_id],
2324 ) {
2325 tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2326 }
2327 emit_json(&ItemEvent {
2328 item: &item_key,
2329 status: "failed",
2330 memory_id: None,
2331 entity_id: None,
2332 entities: None,
2333 rels: None,
2334 chars_before: None,
2335 chars_after: None,
2336 cost_usd: None,
2337 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2338 error: Some(err_str),
2339 index: current_index,
2340 total,
2341 });
2342 }
2343 }
2344
2345 let _ = item_type; }
2347 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2350 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2351
2352 emit_json(&EnrichSummary {
2353 summary: true,
2354 operation: format!("{:?}", args.operation),
2355 items_total: total,
2356 completed,
2357 failed,
2358 skipped,
2359 cost_usd: cost_total,
2360 elapsed_ms: started.elapsed().as_millis() as u64,
2361 });
2362
2363 if failed == 0 {
2364 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2365 }
2366
2367 Ok(())
2368}
2369
2370enum EnrichItemResult {
2375 Done {
2376 memory_id: Option<i64>,
2377 entity_id: Option<i64>,
2378 entities: usize,
2379 rels: usize,
2380 chars_before: Option<usize>,
2381 chars_after: Option<usize>,
2382 cost: f64,
2383 is_oauth: bool,
2384 },
2385 Skipped {
2386 reason: String,
2387 },
2388 PreservationFailed {
2393 score: f64,
2394 threshold: f64,
2395 chars_before: usize,
2396 chars_after: usize,
2397 },
2398}
2399
2400fn call_memory_bindings(
2405 conn: &Connection,
2406 namespace: &str,
2407 memory_name: &str,
2408 binary: &Path,
2409 model: Option<&str>,
2410 timeout: u64,
2411 mode: &EnrichMode,
2412) -> Result<EnrichItemResult, AppError> {
2413 let (memory_id, body): (i64, String) = conn.query_row(
2415 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2416 rusqlite::params![namespace, memory_name],
2417 |r| Ok((r.get(0)?, r.get(1)?)),
2418 ).map_err(|e| match e {
2419 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2420 other => AppError::Database(other),
2421 })?;
2422
2423 if body.trim().is_empty() {
2424 return Ok(EnrichItemResult::Skipped {
2425 reason: "body is empty".to_string(),
2426 });
2427 }
2428
2429 let (value, cost, is_oauth) = match mode {
2430 EnrichMode::ClaudeCode => call_claude(
2431 binary,
2432 BINDINGS_PROMPT,
2433 BINDINGS_SCHEMA,
2434 &body,
2435 model,
2436 timeout,
2437 )?,
2438 EnrichMode::Codex => call_codex(
2439 binary,
2440 BINDINGS_PROMPT,
2441 BINDINGS_SCHEMA,
2442 &body,
2443 model,
2444 timeout,
2445 )?,
2446 };
2447
2448 let empty_arr = serde_json::Value::Array(vec![]);
2449 let entities_val = value.get("entities").unwrap_or(&empty_arr);
2450 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2451
2452 let (ent_count, rel_count) =
2453 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2454
2455 Ok(EnrichItemResult::Done {
2456 memory_id: Some(memory_id),
2457 entity_id: None,
2458 entities: ent_count,
2459 rels: rel_count,
2460 chars_before: None,
2461 chars_after: None,
2462 cost,
2463 is_oauth,
2464 })
2465}
2466
2467fn call_entity_description(
2468 conn: &Connection,
2469 namespace: &str,
2470 entity_name: &str,
2471 binary: &Path,
2472 model: Option<&str>,
2473 timeout: u64,
2474 mode: &EnrichMode,
2475) -> Result<EnrichItemResult, AppError> {
2476 let (entity_id, entity_type): (i64, String) = conn
2477 .query_row(
2478 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2479 rusqlite::params![namespace, entity_name],
2480 |r| Ok((r.get(0)?, r.get(1)?)),
2481 )
2482 .map_err(|e| match e {
2483 rusqlite::Error::QueryReturnedNoRows => {
2484 AppError::NotFound(format!("entity '{entity_name}' not found"))
2485 }
2486 other => AppError::Database(other),
2487 })?;
2488
2489 let prompt = format!(
2490 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2491 );
2492
2493 let (value, cost, is_oauth) = match mode {
2494 EnrichMode::ClaudeCode => call_claude(
2495 binary,
2496 &prompt,
2497 ENTITY_DESCRIPTION_SCHEMA,
2498 "",
2499 model,
2500 timeout,
2501 )?,
2502 EnrichMode::Codex => call_codex(
2503 binary,
2504 &prompt,
2505 ENTITY_DESCRIPTION_SCHEMA,
2506 "",
2507 model,
2508 timeout,
2509 )?,
2510 };
2511
2512 let description = value
2513 .get("description")
2514 .and_then(|v| v.as_str())
2515 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2516
2517 persist_entity_description(conn, entity_id, description)?;
2518
2519 Ok(EnrichItemResult::Done {
2520 memory_id: None,
2521 entity_id: Some(entity_id),
2522 entities: 0,
2523 rels: 0,
2524 chars_before: None,
2525 chars_after: None,
2526 cost,
2527 is_oauth,
2528 })
2529}
2530
2531#[allow(clippy::too_many_arguments)]
2532fn call_body_enrich(
2533 conn: &Connection,
2534 namespace: &str,
2535 memory_name: &str,
2536 binary: &Path,
2537 model: Option<&str>,
2538 timeout: u64,
2539 mode: &EnrichMode,
2540 min_output_chars: usize,
2541 max_output_chars: usize,
2542 prompt_template: Option<&Path>,
2543 preserve_threshold: f64,
2544 paths: &crate::paths::AppPaths,
2545) -> Result<EnrichItemResult, AppError> {
2546 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2547 .query_row(
2548 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2549 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2550 rusqlite::params![namespace, memory_name],
2551 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2552 )
2553 .map_err(|e| match e {
2554 rusqlite::Error::QueryReturnedNoRows => {
2555 AppError::NotFound(format!("memory '{memory_name}' not found"))
2556 }
2557 other => AppError::Database(other),
2558 })?;
2559
2560 let chars_before = body.chars().count();
2561
2562 let linked_entities: Vec<String> = {
2564 let mut stmt = conn.prepare_cached(
2565 "SELECT e.name FROM memory_entities me \
2566 JOIN entities e ON e.id = me.entity_id \
2567 WHERE me.memory_id = ?1 LIMIT 10",
2568 )?;
2569 let result: Vec<String> = stmt
2570 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2571 .filter_map(|r| r.ok())
2572 .collect();
2573 drop(stmt);
2574 result
2575 };
2576
2577 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2579 let file_size = std::fs::metadata(tmpl_path)
2580 .map_err(|e| {
2581 AppError::Io(std::io::Error::new(
2582 e.kind(),
2583 format!("failed to stat prompt template: {e}"),
2584 ))
2585 })?
2586 .len();
2587 if file_size > MAX_MEMORY_BODY_LEN as u64 {
2588 return Err(AppError::LimitExceeded(
2589 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2590 ));
2591 }
2592 std::fs::read_to_string(tmpl_path).map_err(|e| {
2593 AppError::Io(std::io::Error::new(
2594 e.kind(),
2595 format!("failed to read prompt template: {e}"),
2596 ))
2597 })?
2598 } else {
2599 BODY_ENRICH_PROMPT_PREFIX.to_string()
2600 };
2601
2602 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2604 let mut ctx = String::new();
2605 ctx.push_str(&format!(
2606 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2607 ));
2608 if !description.is_empty() {
2609 ctx.push_str(&format!("- Description: {description}\n"));
2610 }
2611 ctx.push_str(&format!("- Domain: {namespace}\n"));
2612 if !linked_entities.is_empty() {
2613 ctx.push_str(&format!(
2614 "- Linked entities: {}\n",
2615 linked_entities.join(", ")
2616 ));
2617 }
2618 ctx
2619 } else {
2620 String::new()
2621 };
2622
2623 let prompt = format!(
2624 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2625 );
2626
2627 let (value, cost, is_oauth) = match mode {
2629 EnrichMode::ClaudeCode => {
2630 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2631 }
2632 EnrichMode::Codex => {
2633 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2634 }
2635 };
2636
2637 let enriched_body = value
2638 .get("enriched_body")
2639 .and_then(|v| v.as_str())
2640 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2641
2642 let chars_after = enriched_body.chars().count();
2643
2644 let threshold = preserve_threshold;
2651 let verdict =
2652 crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2653 if !verdict.is_accepted() {
2654 return Ok(EnrichItemResult::PreservationFailed {
2655 score: match verdict {
2656 crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2657 crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2658 crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2659 },
2660 threshold,
2661 chars_before,
2662 chars_after,
2663 });
2664 }
2665
2666 let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2672 let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2673 if old_hash == new_hash {
2674 return Ok(EnrichItemResult::Skipped {
2675 reason: format!(
2676 "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2677 ),
2678 });
2679 }
2680
2681 if chars_after <= chars_before {
2683 return Ok(EnrichItemResult::Skipped {
2684 reason: format!(
2685 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2686 ),
2687 });
2688 }
2689
2690 persist_enriched_body(
2691 conn,
2692 namespace,
2693 memory_id,
2694 memory_name,
2695 enriched_body,
2696 paths,
2697 )?;
2698
2699 Ok(EnrichItemResult::Done {
2700 memory_id: Some(memory_id),
2701 entity_id: None,
2702 entities: 0,
2703 rels: 0,
2704 chars_before: Some(chars_before),
2705 chars_after: Some(chars_after),
2706 cost,
2707 is_oauth,
2708 })
2709}
2710
2711fn call_reembed(
2712 conn: &Connection,
2713 namespace: &str,
2714 memory_name: &str,
2715 paths: &crate::paths::AppPaths,
2716) -> Result<EnrichItemResult, AppError> {
2717 let (memory_id, body, memory_type): (i64, String, String) = conn
2718 .query_row(
2719 "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2720 FROM memories
2721 WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2722 rusqlite::params![namespace, memory_name],
2723 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2724 )
2725 .map_err(|e| match e {
2726 rusqlite::Error::QueryReturnedNoRows => {
2727 AppError::NotFound(format!("memory '{memory_name}' not found"))
2728 }
2729 other => AppError::Database(other),
2730 })?;
2731
2732 if body.trim().is_empty() {
2733 return Ok(EnrichItemResult::Skipped {
2734 reason: "body is empty".to_string(),
2735 });
2736 }
2737
2738 reembed_memory_vector(
2739 conn,
2740 namespace,
2741 memory_id,
2742 memory_name,
2743 &memory_type,
2744 &body,
2745 paths,
2746 )?;
2747
2748 Ok(EnrichItemResult::Done {
2749 memory_id: Some(memory_id),
2750 entity_id: None,
2751 entities: 0,
2752 rels: 0,
2753 chars_before: Some(body.chars().count()),
2754 chars_after: Some(body.chars().count()),
2755 cost: 0.0,
2756 is_oauth: true,
2757 })
2758}
2759
2760fn scan_operation(
2765 conn: &Connection,
2766 namespace: &str,
2767 args: &EnrichArgs,
2768) -> Result<Vec<String>, AppError> {
2769 let name_filter = resolve_name_filter(args)?;
2771 match args.operation {
2772 EnrichOperation::MemoryBindings => {
2773 let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
2774 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2775 }
2776 EnrichOperation::EntityDescriptions => {
2777 let rows = scan_entities_without_description(conn, namespace, args.limit)?;
2778 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2779 }
2780 EnrichOperation::BodyEnrich => {
2781 let rows =
2782 scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
2783 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2784 }
2785 EnrichOperation::ReEmbed => {
2786 let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
2787 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2788 }
2789 EnrichOperation::WeightCalibrate => {
2790 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
2791 Ok(rows
2792 .into_iter()
2793 .map(|(id, _, _, _, _)| id.to_string())
2794 .collect())
2795 }
2796 EnrichOperation::RelationReclassify => {
2797 let rows = scan_generic_relations(conn, namespace, args.limit)?;
2798 Ok(rows
2799 .into_iter()
2800 .map(|(id, _, _, _)| id.to_string())
2801 .collect())
2802 }
2803 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2804 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
2805 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
2806 }
2807 EnrichOperation::EntityTypeValidate => {
2808 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
2809 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2810 }
2811 EnrichOperation::DescriptionEnrich => {
2812 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2813 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2814 }
2815 EnrichOperation::DomainClassify
2816 | EnrichOperation::GraphAudit
2817 | EnrichOperation::DeepResearchSynth
2818 | EnrichOperation::BodyExtract => {
2819 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2820 let sql = format!(
2821 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2822 );
2823 let mut stmt = conn.prepare(&sql)?;
2824 let names = stmt
2825 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2826 .collect::<Result<Vec<_>, _>>()?;
2827 Ok(names)
2828 }
2829 }
2830}
2831
2832fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2838 if let Some(p) = explicit {
2839 if p.exists() {
2840 return Ok(p.to_path_buf());
2841 }
2842 return Err(AppError::Validation(format!(
2843 "Codex binary not found at explicit path: {}",
2844 p.display()
2845 )));
2846 }
2847
2848 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2849 let p = PathBuf::from(&env_path);
2850 if p.exists() {
2851 return Ok(p);
2852 }
2853 }
2854
2855 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2856 if let Some(path_var) = std::env::var_os("PATH") {
2857 for dir in std::env::split_paths(&path_var) {
2858 let candidate = dir.join(name);
2859 if candidate.exists() {
2860 return Ok(crate::extract::llm_embedding::resolve_real_binary(
2861 &candidate,
2862 ));
2863 }
2864 }
2865 }
2866
2867 Err(AppError::Validation(
2868 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2869 ))
2870}
2871
2872fn call_weight_calibrate(
2874 conn: &Connection,
2875 _namespace: &str,
2876 item_key: &str,
2877 binary: &Path,
2878 model: Option<&str>,
2879 timeout: u64,
2880 mode: &EnrichMode,
2881) -> Result<EnrichItemResult, AppError> {
2882 let rel_id: i64 = item_key
2883 .parse()
2884 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2885 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2886 .query_row(
2887 "SELECT e1.name, e2.name, r.relation, r.weight \
2888 FROM relationships r \
2889 JOIN entities e1 ON e1.id = r.source_id \
2890 JOIN entities e2 ON e2.id = r.target_id \
2891 WHERE r.id = ?1",
2892 rusqlite::params![rel_id],
2893 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2894 )
2895 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2896
2897 let input_text = format!(
2898 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2899 );
2900 let (value, cost, is_oauth) = match mode {
2901 EnrichMode::ClaudeCode => call_claude(
2902 binary,
2903 WEIGHT_CALIBRATE_PROMPT,
2904 WEIGHT_CALIBRATE_SCHEMA,
2905 &input_text,
2906 model,
2907 timeout,
2908 )?,
2909 EnrichMode::Codex => call_codex(
2910 binary,
2911 WEIGHT_CALIBRATE_PROMPT,
2912 WEIGHT_CALIBRATE_SCHEMA,
2913 &input_text,
2914 model,
2915 timeout,
2916 )?,
2917 };
2918
2919 let calibrated = value
2920 .get("calibrated_weight")
2921 .and_then(|v| v.as_f64())
2922 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2923
2924 conn.execute(
2925 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2926 rusqlite::params![calibrated, rel_id],
2927 )?;
2928
2929 Ok(EnrichItemResult::Done {
2930 memory_id: None,
2931 entity_id: None,
2932 entities: 0,
2933 rels: 1,
2934 chars_before: None,
2935 chars_after: None,
2936 cost,
2937 is_oauth,
2938 })
2939}
2940
2941fn call_relation_reclassify(
2943 conn: &Connection,
2944 _namespace: &str,
2945 item_key: &str,
2946 binary: &Path,
2947 model: Option<&str>,
2948 timeout: u64,
2949 mode: &EnrichMode,
2950) -> Result<EnrichItemResult, AppError> {
2951 let rel_id: i64 = item_key
2952 .parse()
2953 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2954 let (source_name, target_name, current_relation): (String, String, String) = conn
2955 .query_row(
2956 "SELECT e1.name, e2.name, r.relation \
2957 FROM relationships r \
2958 JOIN entities e1 ON e1.id = r.source_id \
2959 JOIN entities e2 ON e2.id = r.target_id \
2960 WHERE r.id = ?1",
2961 rusqlite::params![rel_id],
2962 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2963 )
2964 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2965
2966 let input_text = format!(
2967 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
2968 );
2969 let (value, cost, is_oauth) = match mode {
2970 EnrichMode::ClaudeCode => call_claude(
2971 binary,
2972 RELATION_RECLASSIFY_PROMPT,
2973 RELATION_RECLASSIFY_SCHEMA,
2974 &input_text,
2975 model,
2976 timeout,
2977 )?,
2978 EnrichMode::Codex => call_codex(
2979 binary,
2980 RELATION_RECLASSIFY_PROMPT,
2981 RELATION_RECLASSIFY_SCHEMA,
2982 &input_text,
2983 model,
2984 timeout,
2985 )?,
2986 };
2987
2988 let new_relation = value
2989 .get("relation")
2990 .and_then(|v| v.as_str())
2991 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
2992 let new_strength = value
2993 .get("strength")
2994 .and_then(|v| v.as_f64())
2995 .unwrap_or(0.5);
2996
2997 conn.execute(
2998 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
2999 rusqlite::params![new_relation, new_strength, rel_id],
3000 )?;
3001
3002 Ok(EnrichItemResult::Done {
3003 memory_id: None,
3004 entity_id: None,
3005 entities: 0,
3006 rels: 1,
3007 chars_before: None,
3008 chars_after: None,
3009 cost,
3010 is_oauth,
3011 })
3012}
3013
3014fn call_entity_connect(
3016 conn: &Connection,
3017 namespace: &str,
3018 item_key: &str,
3019 binary: &Path,
3020 model: Option<&str>,
3021 timeout: u64,
3022 mode: &EnrichMode,
3023) -> Result<EnrichItemResult, AppError> {
3024 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3025 let (e1_id, e1_name, e2_id, e2_name) =
3026 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3027 Some(p) => p,
3028 None => {
3029 return Ok(EnrichItemResult::Skipped {
3030 reason: "pair no longer isolated".into(),
3031 })
3032 }
3033 };
3034 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3035 let (value, cost, is_oauth) = match mode {
3036 EnrichMode::ClaudeCode => call_claude(
3037 binary,
3038 ENTITY_CONNECT_PROMPT,
3039 ENTITY_CONNECT_SCHEMA,
3040 &input_text,
3041 model,
3042 timeout,
3043 )?,
3044 EnrichMode::Codex => call_codex(
3045 binary,
3046 ENTITY_CONNECT_PROMPT,
3047 ENTITY_CONNECT_SCHEMA,
3048 &input_text,
3049 model,
3050 timeout,
3051 )?,
3052 };
3053 let relation = value
3054 .get("relation")
3055 .and_then(|v| v.as_str())
3056 .unwrap_or("none");
3057 if relation == "none" {
3058 return Ok(EnrichItemResult::Skipped {
3059 reason: "LLM determined no relationship".into(),
3060 });
3061 }
3062 let strength = value
3063 .get("strength")
3064 .and_then(|v| v.as_f64())
3065 .unwrap_or(0.5);
3066 conn.execute(
3067 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3068 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3069 )?;
3070 Ok(EnrichItemResult::Done {
3071 memory_id: None,
3072 entity_id: None,
3073 entities: 0,
3074 rels: 1,
3075 chars_before: None,
3076 chars_after: None,
3077 cost,
3078 is_oauth,
3079 })
3080}
3081
3082fn call_entity_type_validate(
3084 conn: &Connection,
3085 _namespace: &str,
3086 item_key: &str,
3087 binary: &Path,
3088 model: Option<&str>,
3089 timeout: u64,
3090 mode: &EnrichMode,
3091) -> Result<EnrichItemResult, AppError> {
3092 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3093 .query_row(
3094 "SELECT id, name, type FROM entities WHERE name = ?1",
3095 rusqlite::params![item_key],
3096 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3097 )
3098 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3099 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3100 let (value, cost, is_oauth) = match mode {
3101 EnrichMode::ClaudeCode => call_claude(
3102 binary,
3103 ENTITY_TYPE_VALIDATE_PROMPT,
3104 ENTITY_TYPE_VALIDATE_SCHEMA,
3105 &input_text,
3106 model,
3107 timeout,
3108 )?,
3109 EnrichMode::Codex => call_codex(
3110 binary,
3111 ENTITY_TYPE_VALIDATE_PROMPT,
3112 ENTITY_TYPE_VALIDATE_SCHEMA,
3113 &input_text,
3114 model,
3115 timeout,
3116 )?,
3117 };
3118 let validated_type = value
3119 .get("validated_type")
3120 .and_then(|v| v.as_str())
3121 .unwrap_or(&ent_type);
3122 let was_correct = value
3123 .get("was_correct")
3124 .and_then(|v| v.as_bool())
3125 .unwrap_or(true);
3126 if !was_correct {
3127 conn.execute(
3128 "UPDATE entities SET type = ?1 WHERE id = ?2",
3129 rusqlite::params![validated_type, ent_id],
3130 )?;
3131 }
3132 Ok(EnrichItemResult::Done {
3133 memory_id: None,
3134 entity_id: Some(ent_id),
3135 entities: 1,
3136 rels: 0,
3137 chars_before: None,
3138 chars_after: None,
3139 cost,
3140 is_oauth,
3141 })
3142}
3143
3144fn call_description_enrich(
3146 conn: &Connection,
3147 _namespace: &str,
3148 item_key: &str,
3149 binary: &Path,
3150 model: Option<&str>,
3151 timeout: u64,
3152 mode: &EnrichMode,
3153) -> Result<EnrichItemResult, AppError> {
3154 let (mem_id, body, old_desc): (i64, String, String) = conn
3155 .query_row(
3156 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3157 rusqlite::params![item_key],
3158 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3159 )
3160 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3161 let snippet: String = body.chars().take(500).collect();
3162 let input_text = format!(
3163 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3164 );
3165 let (value, cost, is_oauth) = match mode {
3166 EnrichMode::ClaudeCode => call_claude(
3167 binary,
3168 DESCRIPTION_ENRICH_PROMPT,
3169 DESCRIPTION_ENRICH_SCHEMA,
3170 &input_text,
3171 model,
3172 timeout,
3173 )?,
3174 EnrichMode::Codex => call_codex(
3175 binary,
3176 DESCRIPTION_ENRICH_PROMPT,
3177 DESCRIPTION_ENRICH_SCHEMA,
3178 &input_text,
3179 model,
3180 timeout,
3181 )?,
3182 };
3183 let new_desc = value
3184 .get("description")
3185 .and_then(|v| v.as_str())
3186 .unwrap_or(&old_desc);
3187 conn.execute(
3188 "UPDATE memories SET description = ?1 WHERE id = ?2",
3189 rusqlite::params![new_desc, mem_id],
3190 )?;
3191 Ok(EnrichItemResult::Done {
3192 memory_id: Some(mem_id),
3193 entity_id: None,
3194 entities: 0,
3195 rels: 0,
3196 chars_before: Some(old_desc.len()),
3197 chars_after: Some(new_desc.len()),
3198 cost,
3199 is_oauth,
3200 })
3201}
3202
3203fn call_domain_classify(
3205 conn: &Connection,
3206 _namespace: &str,
3207 item_key: &str,
3208 binary: &Path,
3209 model: Option<&str>,
3210 timeout: u64,
3211 mode: &EnrichMode,
3212) -> Result<EnrichItemResult, AppError> {
3213 let (mem_id, body, desc): (i64, String, String) = conn
3214 .query_row(
3215 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3216 rusqlite::params![item_key],
3217 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3218 )
3219 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3220 let snippet: String = body.chars().take(500).collect();
3221 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3222 let (value, cost, is_oauth) = match mode {
3223 EnrichMode::ClaudeCode => call_claude(
3224 binary,
3225 DOMAIN_CLASSIFY_PROMPT,
3226 DOMAIN_CLASSIFY_SCHEMA,
3227 &input_text,
3228 model,
3229 timeout,
3230 )?,
3231 EnrichMode::Codex => call_codex(
3232 binary,
3233 DOMAIN_CLASSIFY_PROMPT,
3234 DOMAIN_CLASSIFY_SCHEMA,
3235 &input_text,
3236 model,
3237 timeout,
3238 )?,
3239 };
3240 let domain = value
3241 .get("domain")
3242 .and_then(|v| v.as_str())
3243 .unwrap_or("uncategorized");
3244 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3245 conn.execute(
3246 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3247 rusqlite::params![metadata, mem_id],
3248 )?;
3249 Ok(EnrichItemResult::Done {
3250 memory_id: Some(mem_id),
3251 entity_id: None,
3252 entities: 0,
3253 rels: 0,
3254 chars_before: None,
3255 chars_after: None,
3256 cost,
3257 is_oauth,
3258 })
3259}
3260
3261fn call_graph_audit(
3263 conn: &Connection,
3264 _namespace: &str,
3265 item_key: &str,
3266 binary: &Path,
3267 model: Option<&str>,
3268 timeout: u64,
3269 mode: &EnrichMode,
3270) -> Result<EnrichItemResult, AppError> {
3271 let (mem_id, body, desc): (i64, String, String) = conn
3272 .query_row(
3273 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3274 rusqlite::params![item_key],
3275 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3276 )
3277 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3278 let snippet: String = body.chars().take(500).collect();
3279 let ent_count: i64 = conn
3280 .query_row(
3281 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3282 rusqlite::params![mem_id],
3283 |r| r.get(0),
3284 )
3285 .unwrap_or(0);
3286 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3287 let (value, cost, is_oauth) = match mode {
3288 EnrichMode::ClaudeCode => call_claude(
3289 binary,
3290 GRAPH_AUDIT_PROMPT,
3291 GRAPH_AUDIT_SCHEMA,
3292 &input_text,
3293 model,
3294 timeout,
3295 )?,
3296 EnrichMode::Codex => call_codex(
3297 binary,
3298 GRAPH_AUDIT_PROMPT,
3299 GRAPH_AUDIT_SCHEMA,
3300 &input_text,
3301 model,
3302 timeout,
3303 )?,
3304 };
3305 let issues = value
3306 .get("issues")
3307 .and_then(|v| v.as_array())
3308 .map(|a| a.len())
3309 .unwrap_or(0);
3310 Ok(EnrichItemResult::Done {
3311 memory_id: Some(mem_id),
3312 entity_id: None,
3313 entities: 0,
3314 rels: issues,
3315 chars_before: None,
3316 chars_after: None,
3317 cost,
3318 is_oauth,
3319 })
3320}
3321
3322fn call_deep_research_synth(
3324 conn: &Connection,
3325 namespace: &str,
3326 item_key: &str,
3327 binary: &Path,
3328 model: Option<&str>,
3329 timeout: u64,
3330 mode: &EnrichMode,
3331) -> Result<EnrichItemResult, AppError> {
3332 let (mem_id, body): (i64, String) = conn
3333 .query_row(
3334 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3335 rusqlite::params![item_key],
3336 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3337 )
3338 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3339 let snippet: String = body.chars().take(2000).collect();
3340 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3341 let (value, cost, is_oauth) = match mode {
3342 EnrichMode::ClaudeCode => call_claude(
3343 binary,
3344 DEEP_RESEARCH_SYNTH_PROMPT,
3345 DEEP_RESEARCH_SYNTH_SCHEMA,
3346 &input_text,
3347 model,
3348 timeout,
3349 )?,
3350 EnrichMode::Codex => call_codex(
3351 binary,
3352 DEEP_RESEARCH_SYNTH_PROMPT,
3353 DEEP_RESEARCH_SYNTH_SCHEMA,
3354 &input_text,
3355 model,
3356 timeout,
3357 )?,
3358 };
3359 let mut ent_count = 0usize;
3360 let mut rel_count = 0usize;
3361 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3362 for e in ents {
3363 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3364 let etype_str = e
3365 .get("entity_type")
3366 .and_then(|v| v.as_str())
3367 .unwrap_or("concept");
3368 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3369 if name.len() >= 2 {
3370 let ne = NewEntity {
3371 name: name.to_string(),
3372 entity_type: etype,
3373 description: None,
3374 };
3375 let _ = entities::upsert_entity(conn, namespace, &ne);
3376 ent_count += 1;
3377 }
3378 }
3379 }
3380 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3381 for r in rels {
3382 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3383 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3384 if src.is_empty() || tgt.is_empty() {
3385 continue;
3386 }
3387 let rel = r
3388 .get("relation")
3389 .and_then(|v| v.as_str())
3390 .unwrap_or("related");
3391 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3392 if let (Some(sid), Some(tid)) = (
3393 entities::find_entity_id(conn, namespace, src)?,
3394 entities::find_entity_id(conn, namespace, tgt)?,
3395 ) {
3396 let _ = entities::create_or_fetch_relationship(
3397 conn, namespace, sid, tid, rel, str_, None,
3398 );
3399 rel_count += 1;
3400 }
3401 }
3402 }
3403 Ok(EnrichItemResult::Done {
3404 memory_id: Some(mem_id),
3405 entity_id: None,
3406 entities: ent_count,
3407 rels: rel_count,
3408 chars_before: None,
3409 chars_after: None,
3410 cost,
3411 is_oauth,
3412 })
3413}
3414
3415fn call_body_extract(
3417 conn: &Connection,
3418 _namespace: &str,
3419 item_key: &str,
3420 binary: &Path,
3421 model: Option<&str>,
3422 timeout: u64,
3423 mode: &EnrichMode,
3424) -> Result<EnrichItemResult, AppError> {
3425 let (mem_id, body): (i64, String) = conn
3426 .query_row(
3427 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3428 rusqlite::params![item_key],
3429 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3430 )
3431 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3432 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3433 let (value, cost, is_oauth) = match mode {
3434 EnrichMode::ClaudeCode => call_claude(
3435 binary,
3436 BODY_EXTRACT_PROMPT,
3437 BODY_EXTRACT_SCHEMA,
3438 &input_text,
3439 model,
3440 timeout,
3441 )?,
3442 EnrichMode::Codex => call_codex(
3443 binary,
3444 BODY_EXTRACT_PROMPT,
3445 BODY_EXTRACT_SCHEMA,
3446 &input_text,
3447 model,
3448 timeout,
3449 )?,
3450 };
3451 let restructured = value
3452 .get("restructured_body")
3453 .and_then(|v| v.as_str())
3454 .unwrap_or(&body);
3455 let chars_before = body.len();
3456 let chars_after = restructured.len();
3457 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3458 conn.execute(
3459 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3460 rusqlite::params![restructured, new_hash, mem_id],
3461 )?;
3462 Ok(EnrichItemResult::Done {
3463 memory_id: Some(mem_id),
3464 entity_id: None,
3465 entities: 0,
3466 rels: 0,
3467 chars_before: Some(chars_before),
3468 chars_after: Some(chars_after),
3469 cost,
3470 is_oauth,
3471 })
3472}
3473
3474#[allow(clippy::type_complexity)]
3476fn scan_isolated_entity_pairs(
3477 conn: &Connection,
3478 namespace: &str,
3479 limit: Option<usize>,
3480) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3481 let limit_val = limit.unwrap_or(50) as i64;
3482 let mut stmt = conn.prepare_cached(
3483 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3484 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3485 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3486 (r.source_id = e1.id AND r.target_id = e2.id) OR \
3487 (r.source_id = e2.id AND r.target_id = e1.id)) \
3488 LIMIT ?2",
3489 )?;
3490 let rows = stmt
3491 .query_map(rusqlite::params![namespace, limit_val], |r| {
3492 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3493 })?
3494 .collect::<Result<Vec<_>, _>>()?;
3495 Ok(rows)
3496}
3497
3498fn scan_entities_for_type_validation(
3500 conn: &Connection,
3501 namespace: &str,
3502 limit: Option<usize>,
3503) -> Result<Vec<(i64, String, String)>, AppError> {
3504 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3505 let sql = format!(
3506 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3507 );
3508 let mut stmt = conn.prepare(&sql)?;
3509 let rows = stmt
3510 .query_map(rusqlite::params![namespace], |r| {
3511 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3512 })?
3513 .collect::<Result<Vec<_>, _>>()?;
3514 Ok(rows)
3515}
3516
3517fn scan_generic_descriptions(
3519 conn: &Connection,
3520 namespace: &str,
3521 limit: Option<usize>,
3522) -> Result<Vec<(i64, String, String)>, AppError> {
3523 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3524 let sql = format!(
3525 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3526 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3527 ORDER BY id {limit_clause}"
3528 );
3529 let mut stmt = conn.prepare(&sql)?;
3530 let rows = stmt
3531 .query_map(rusqlite::params![namespace], |r| {
3532 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3533 })?
3534 .collect::<Result<Vec<_>, _>>()?;
3535 Ok(rows)
3536}
3537
3538fn call_codex(
3542 binary: &Path,
3543 prompt: &str,
3544 json_schema: &str,
3545 input_text: &str,
3546 model: Option<&str>,
3547 timeout_secs: u64,
3548) -> Result<(serde_json::Value, f64, bool), AppError> {
3549 use wait_timeout::ChildExt;
3550
3551 super::codex_spawn::validate_codex_model(model)?;
3556 let schema_file = super::codex_spawn::trusted_schema_path()?;
3557
3558 let args = super::codex_spawn::CodexSpawnArgs {
3559 binary,
3560 prompt,
3561 json_schema,
3562 input_text,
3563 model,
3564 timeout_secs,
3565 schema_path: schema_file.clone(),
3566 };
3567 let mut cmd = super::codex_spawn::build_codex_command(&args);
3568
3569 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3570 AppError::Io(std::io::Error::new(
3571 e.kind(),
3572 format!("failed to spawn codex: {e}"),
3573 ))
3574 })?;
3575
3576 let full_prompt = format!("{prompt}\n\n{input_text}");
3577 let stdin_bytes = full_prompt.into_bytes();
3578 let mut child_stdin = child
3579 .stdin
3580 .take()
3581 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3582 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3583 child_stdin.write_all(&stdin_bytes)?;
3584 drop(child_stdin);
3585 Ok(())
3586 });
3587
3588 let start = std::time::Instant::now();
3589 let timeout = std::time::Duration::from_secs(timeout_secs);
3590 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3591 let _ = std::fs::remove_file(&schema_file);
3592
3593 match status {
3594 Some(exit_status) => {
3595 stdin_thread
3596 .join()
3597 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3598 .map_err(AppError::Io)?;
3599
3600 tracing::debug!(
3601 target: "process",
3602 exit_code = ?exit_status.code(),
3603 elapsed_ms = start.elapsed().as_millis() as u64,
3604 "external process completed"
3605 );
3606
3607 let mut stdout_buf = Vec::new();
3608 if let Some(mut out) = child.stdout.take() {
3609 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3610 }
3611 if !exit_status.success() {
3612 let mut stderr_buf = Vec::new();
3613 if let Some(mut err) = child.stderr.take() {
3614 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3615 }
3616 let stderr_str = String::from_utf8_lossy(&stderr_buf);
3617 tracing::warn!(
3618 target: "enrich",
3619 exit_code = ?exit_status.code(),
3620 stderr = %stderr_str.trim(),
3621 "codex process failed"
3622 );
3623 return Err(AppError::Validation(format!(
3624 "codex exited with code {:?}: {}",
3625 exit_status.code(),
3626 stderr_str.trim()
3627 )));
3628 }
3629 let stdout_str = String::from_utf8(stdout_buf)
3630 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3631 let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
3634 let value: serde_json::Value =
3640 serde_json::from_str(&result.last_agent_text).map_err(|e| {
3641 AppError::Validation(format!(
3642 "codex agent_message is not valid JSON: {e}; raw={}",
3643 result.last_agent_text
3644 ))
3645 })?;
3646 Ok((value, 0.0, false))
3647 }
3648 None => {
3649 let _ = child.kill();
3650 let _ = child.wait();
3651 let _ = stdin_thread.join();
3652 Err(AppError::Validation(format!(
3653 "codex timed out after {timeout_secs} seconds"
3654 )))
3655 }
3656 }
3657}
3658
3659#[cfg(test)]
3664mod tests {
3665 use super::*;
3666 use rusqlite::Connection;
3667 #[cfg(unix)]
3668 use std::os::unix::fs::PermissionsExt;
3669
3670 fn open_test_db() -> Connection {
3672 let conn = Connection::open_in_memory().expect("in-memory db");
3673 conn.execute_batch(
3674 "CREATE TABLE memories (
3675 id INTEGER PRIMARY KEY AUTOINCREMENT,
3676 namespace TEXT NOT NULL DEFAULT 'global',
3677 name TEXT NOT NULL,
3678 type TEXT NOT NULL DEFAULT 'note',
3679 description TEXT NOT NULL DEFAULT '',
3680 body TEXT NOT NULL DEFAULT '',
3681 body_hash TEXT NOT NULL DEFAULT '',
3682 session_id TEXT,
3683 source TEXT NOT NULL DEFAULT 'agent',
3684 metadata TEXT NOT NULL DEFAULT '{}',
3685 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3686 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3687 deleted_at INTEGER,
3688 UNIQUE(namespace, name)
3689 );
3690 CREATE TABLE entities (
3691 id INTEGER PRIMARY KEY AUTOINCREMENT,
3692 namespace TEXT NOT NULL DEFAULT 'global',
3693 name TEXT NOT NULL,
3694 type TEXT NOT NULL DEFAULT 'concept',
3695 description TEXT,
3696 degree INTEGER NOT NULL DEFAULT 0,
3697 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3698 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3699 UNIQUE(namespace, name)
3700 );
3701 CREATE TABLE memory_entities (
3702 memory_id INTEGER NOT NULL,
3703 entity_id INTEGER NOT NULL,
3704 PRIMARY KEY (memory_id, entity_id)
3705 );
3706 CREATE TABLE relationships (
3707 id INTEGER PRIMARY KEY AUTOINCREMENT,
3708 namespace TEXT NOT NULL DEFAULT 'global',
3709 source_id INTEGER NOT NULL,
3710 target_id INTEGER NOT NULL,
3711 relation TEXT NOT NULL,
3712 weight REAL NOT NULL DEFAULT 0.5,
3713 description TEXT,
3714 UNIQUE(source_id, target_id, relation)
3715 );
3716 CREATE TABLE memory_embeddings (
3717 memory_id INTEGER PRIMARY KEY,
3718 namespace TEXT NOT NULL,
3719 embedding BLOB NOT NULL,
3720 source TEXT NOT NULL,
3721 model TEXT NOT NULL DEFAULT '',
3722 dim INTEGER NOT NULL DEFAULT 384,
3723 created_at INTEGER NOT NULL DEFAULT (unixepoch())
3724 );",
3725 )
3726 .expect("schema creation must succeed");
3727 conn
3728 }
3729
3730 #[test]
3731 fn scan_unbound_memories_finds_memories_without_bindings() {
3732 let conn = open_test_db();
3733 conn.execute(
3734 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
3735 [],
3736 )
3737 .unwrap();
3738
3739 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3740 assert_eq!(results.len(), 1);
3741 assert_eq!(results[0].1, "test-mem");
3742 }
3743
3744 #[test]
3745 fn scan_unbound_memories_excludes_bound_memories() {
3746 let conn = open_test_db();
3747 conn.execute(
3748 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
3749 [],
3750 )
3751 .unwrap();
3752 let mem_id: i64 = conn
3753 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
3754 r.get(0)
3755 })
3756 .unwrap();
3757 conn.execute(
3758 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
3759 [],
3760 )
3761 .unwrap();
3762 let ent_id: i64 = conn
3763 .query_row(
3764 "SELECT id FROM entities WHERE name='some-entity'",
3765 [],
3766 |r| r.get(0),
3767 )
3768 .unwrap();
3769 conn.execute(
3770 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
3771 rusqlite::params![mem_id, ent_id],
3772 )
3773 .unwrap();
3774
3775 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3776 assert!(results.is_empty(), "bound memory must not appear in scan");
3777 }
3778
3779 #[test]
3780 fn scan_entities_without_description_finds_null_description() {
3781 let conn = open_test_db();
3782 conn.execute(
3783 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
3784 [],
3785 )
3786 .unwrap();
3787
3788 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3789 assert_eq!(results.len(), 1);
3790 assert_eq!(results[0].1, "my-tool");
3791 }
3792
3793 #[test]
3794 fn scan_entities_without_description_excludes_entities_with_description() {
3795 let conn = open_test_db();
3796 conn.execute(
3797 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
3798 [],
3799 )
3800 .unwrap();
3801
3802 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3803 assert!(
3804 results.is_empty(),
3805 "entity with description must not appear"
3806 );
3807 }
3808
3809 #[test]
3810 fn scan_short_body_memories_finds_short_bodies() {
3811 let conn = open_test_db();
3812 conn.execute(
3813 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3814 [],
3815 )
3816 .unwrap();
3817
3818 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3819 assert_eq!(results.len(), 1);
3820 assert_eq!(results[0].1, "short-mem");
3821 }
3822
3823 #[test]
3824 fn scan_short_body_memories_excludes_long_bodies() {
3825 let conn = open_test_db();
3826 let long_body = "a".repeat(1000);
3827 conn.execute(
3828 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3829 rusqlite::params![long_body],
3830 )
3831 .unwrap();
3832
3833 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3834 assert!(results.is_empty(), "long memory must not appear in scan");
3835 }
3836
3837 #[test]
3838 fn scan_respects_limit() {
3839 let conn = open_test_db();
3840 for i in 0..5 {
3841 conn.execute(
3842 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3843 [],
3844 )
3845 .unwrap();
3846 }
3847
3848 let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3849 assert_eq!(results.len(), 3, "limit must be respected");
3850 }
3851
3852 #[test]
3853 fn scan_memories_without_embeddings_finds_only_missing_rows() {
3854 let conn = open_test_db();
3855 conn.execute(
3856 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
3857 [],
3858 )
3859 .unwrap();
3860 conn.execute(
3861 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
3862 [],
3863 )
3864 .unwrap();
3865 let memory_id: i64 = conn
3866 .query_row(
3867 "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
3868 [],
3869 |r| r.get(0),
3870 )
3871 .unwrap();
3872 let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
3873 memories::upsert_vec(
3874 &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
3875 )
3876 .unwrap();
3877
3878 let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
3879 assert_eq!(results.len(), 1);
3880 assert_eq!(results[0].1, "missing-vec");
3881 }
3882
3883 #[test]
3884 fn scan_memories_without_embeddings_respects_name_filter() {
3885 let conn = open_test_db();
3886 conn.execute(
3887 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
3888 [],
3889 )
3890 .unwrap();
3891 conn.execute(
3892 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
3893 [],
3894 )
3895 .unwrap();
3896
3897 let results =
3898 scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
3899 .unwrap();
3900 assert_eq!(results.len(), 1);
3901 assert_eq!(results[0].1, "match-me");
3902 }
3903
3904 #[test]
3905 fn queue_db_schema_creates_correctly() {
3906 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3907 let conn = open_queue_db(&tmp_path).expect("queue db must open");
3908 let count: i64 = conn
3909 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3910 .unwrap();
3911 assert_eq!(count, 0);
3912 let _ = std::fs::remove_file(&tmp_path);
3913 }
3914
3915 #[test]
3916 fn parse_claude_output_valid_bindings() {
3917 let output = r#"[
3918 {"type":"system","subtype":"init"},
3919 {"type":"result","is_error":false,"total_cost_usd":0.01,
3920 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3921 ]"#;
3922 let result = crate::commands::claude_runner::parse_claude_output(output)
3923 .expect("must parse successfully");
3924 assert!(result.value.get("entities").is_some());
3925 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3926 assert!(!result.is_oauth);
3927 }
3928
3929 #[test]
3930 fn parse_claude_output_detects_oauth() {
3931 let output = r#"[
3932 {"type":"system","subtype":"init","apiKeySource":"none"},
3933 {"type":"result","is_error":false,"total_cost_usd":0.0,
3934 "structured_output":{"entities":[],"relationships":[]}}
3935 ]"#;
3936 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
3937 assert!(result.is_oauth);
3938 }
3939
3940 #[test]
3941 fn parse_claude_output_rate_limit_returns_error() {
3942 let output = r#"[
3943 {"type":"system","subtype":"init"},
3944 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
3945 ]"#;
3946 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3947 assert!(matches!(err, AppError::RateLimited { .. }));
3948 }
3949
3950 #[test]
3951 fn parse_claude_output_auth_error() {
3952 let output = r#"[
3953 {"type":"system","subtype":"init"},
3954 {"type":"result","is_error":true,"error":"authentication failed"}
3955 ]"#;
3956 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3957 assert!(format!("{err}").contains("authentication failed"));
3958 }
3959
3960 #[cfg(unix)]
3961 #[test]
3962 fn call_codex_returns_raw_json_for_body_enrich_schema() {
3963 let tmp = tempfile::tempdir().expect("tempdir");
3964 let binary = tmp.path().join("codex-mock");
3965 std::fs::write(
3966 &binary,
3967 r#"#!/usr/bin/env bash
3968set -euo pipefail
3969cat <<'JSONL'
3970{"type":"thread.started","thread_id":"mock-thread-0"}
3971{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
3972{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
3973JSONL
3974"#,
3975 )
3976 .expect("mock codex write");
3977 let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
3978 perms.set_mode(0o755);
3979 std::fs::set_permissions(&binary, perms).expect("chmod");
3980
3981 let (value, cost, is_oauth) =
3982 call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
3983 .expect("call_codex must accept body-enrich payload");
3984
3985 assert_eq!(value["enriched_body"], "expanded body");
3986 assert_eq!(cost, 0.0);
3987 assert!(!is_oauth);
3988 }
3989
3990 #[test]
3991 fn dry_run_emits_preview_without_calling_llm() {
3992 let conn = open_test_db();
3997 conn.execute(
3998 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
3999 [],
4000 )
4001 .unwrap();
4002
4003 let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
4004 assert_eq!(results.len(), 1);
4005 assert_eq!(results[0].1, "dry-mem");
4006 }
4009
4010 #[test]
4011 fn persist_entity_description_updates_db() {
4012 let conn = open_test_db();
4013 conn.execute(
4014 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4015 [],
4016 )
4017 .unwrap();
4018 let eid: i64 = conn
4019 .query_row(
4020 "SELECT id FROM entities WHERE name='tokio-runtime'",
4021 [],
4022 |r| r.get(0),
4023 )
4024 .unwrap();
4025
4026 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4027
4028 let desc: String = conn
4029 .query_row(
4030 "SELECT description FROM entities WHERE id=?1",
4031 rusqlite::params![eid],
4032 |r| r.get(0),
4033 )
4034 .unwrap();
4035 assert_eq!(desc, "Async runtime for Rust applications");
4036 }
4037
4038 #[test]
4039 fn bindings_schema_is_valid_json() {
4040 let _: serde_json::Value =
4041 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4042 }
4043
4044 #[test]
4045 fn entity_description_schema_is_valid_json() {
4046 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4047 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4048 }
4049
4050 #[test]
4051 fn body_enrich_schema_is_valid_json() {
4052 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4053 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4054 }
4055}