1use crate::commands::ingest_claude::find_claude_binary;
25use crate::constants::MAX_MEMORY_BODY_LEN;
26use crate::entity_type::EntityType;
27use crate::errors::AppError;
28use crate::paths::AppPaths;
29use crate::storage::connection::{ensure_db_ready, open_rw};
30use crate::storage::entities::{self, NewEntity, NewRelationship};
31use crate::storage::memories;
32
33use rusqlite::Connection;
34use serde::{Deserialize, Serialize};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::time::Instant;
38
39const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
44const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
45const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
46const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
47
48const BINDINGS_SCHEMA: &str = r#"{
53 "type": "object",
54 "properties": {
55 "entities": {
56 "type": "array",
57 "items": {
58 "type": "object",
59 "properties": {
60 "name": { "type": "string" },
61 "entity_type": {
62 "type": "string",
63 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
64 }
65 },
66 "required": ["name", "entity_type"],
67 "additionalProperties": false
68 }
69 },
70 "relationships": {
71 "type": "array",
72 "items": {
73 "type": "object",
74 "properties": {
75 "source": { "type": "string" },
76 "target": { "type": "string" },
77 "relation": {
78 "type": "string",
79 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
80 },
81 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
82 },
83 "required": ["source","target","relation","strength"],
84 "additionalProperties": false
85 }
86 }
87 },
88 "required": ["entities","relationships"],
89 "additionalProperties": false
90}"#;
91
92const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
93 "type": "object",
94 "properties": {
95 "description": { "type": "string" }
96 },
97 "required": ["description"],
98 "additionalProperties": false
99}"#;
100
101const BODY_ENRICH_SCHEMA: &str = r#"{
102 "type": "object",
103 "properties": {
104 "enriched_body": { "type": "string" }
105 },
106 "required": ["enriched_body"],
107 "additionalProperties": false
108}"#;
109
110const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
112Scale:\n\
113- 0.9 = vital hard dependency (A cannot function without B)\n\
114- 0.7 = important design relationship (A strongly supports/enables B)\n\
115- 0.5 = useful contextual link (A and B share relevant context)\n\
116- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
117Respond with the calibrated weight and brief reasoning.";
118
119const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
120 "type": "object",
121 "properties": {
122 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
123 "reasoning": { "type": "string" }
124 },
125 "required": ["calibrated_weight", "reasoning"],
126 "additionalProperties": false
127}"#;
128
129const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
131Valid canonical relations (pick exactly one):\n\
132- depends-on: A cannot function without B\n\
133- uses: A utilizes B but could substitute it\n\
134- supports: A reinforces or enables B\n\
135- causes: A triggers or produces B\n\
136- fixes: A resolves a problem in B\n\
137- contradicts: A conflicts with or invalidates B\n\
138- applies-to: A is relevant to or scoped within B\n\
139- follows: A comes after B in sequence\n\
140- replaces: A substitutes B\n\
141- tracked-in: A is monitored in B\n\
142- related: A and B share context (use sparingly)\n\n\
143Respond with the correct relation, strength, and reasoning.";
144
145const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
146 "type": "object",
147 "properties": {
148 "relation": { "type": "string" },
149 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
150 "reasoning": { "type": "string" }
151 },
152 "required": ["relation", "strength", "reasoning"],
153 "additionalProperties": false
154}"#;
155
156const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
158Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
159If NO meaningful relationship exists, set relation to \"none\".\n\
160Respond with the relation (or \"none\"), strength, and reasoning.";
161
162const ENTITY_CONNECT_SCHEMA: &str = r#"{
163 "type": "object",
164 "properties": {
165 "relation": { "type": "string" },
166 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
167 "reasoning": { "type": "string" }
168 },
169 "required": ["relation", "strength", "reasoning"],
170 "additionalProperties": false
171}"#;
172
173const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
175Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
176If the current type is correct, keep it. If wrong, suggest the correct type.\n\
177Respond with the validated type and reasoning.";
178
179const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
180 "type": "object",
181 "properties": {
182 "validated_type": { "type": "string" },
183 "was_correct": { "type": "boolean" },
184 "reasoning": { "type": "string" }
185 },
186 "required": ["validated_type", "was_correct", "reasoning"],
187 "additionalProperties": false
188}"#;
189
190const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
192BAD: 'ingested from docs/auth.md'\n\
193GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
194Respond with the improved description and reasoning.";
195
196const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
197 "type": "object",
198 "properties": {
199 "description": { "type": "string" },
200 "reasoning": { "type": "string" }
201 },
202 "required": ["description", "reasoning"],
203 "additionalProperties": false
204}"#;
205
206const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
208Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
209
210const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
211 "type": "object",
212 "properties": {
213 "domain": { "type": "string" },
214 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
215 "reasoning": { "type": "string" }
216 },
217 "required": ["domain", "confidence", "reasoning"],
218 "additionalProperties": false
219}"#;
220
221const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
223Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
224Respond with a list of issues found (or empty if none) and an overall quality score.";
225
226const GRAPH_AUDIT_SCHEMA: &str = r#"{
227 "type": "object",
228 "properties": {
229 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
230 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
231 "reasoning": { "type": "string" }
232 },
233 "required": ["quality_score", "issues", "reasoning"],
234 "additionalProperties": false
235}"#;
236
237const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
239Entity names: lowercase kebab-case, domain-specific.\n\
240Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
241Respond with extracted entities, relationships, and a synthesis summary.";
242
243const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
244 "type": "object",
245 "properties": {
246 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
247 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
248 "summary": { "type": "string" }
249 },
250 "required": ["entities", "relationships", "summary"],
251 "additionalProperties": false
252}"#;
253
254const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
256Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
257Respond with the restructured body and a brief summary of changes.";
258
259const BODY_EXTRACT_SCHEMA: &str = r#"{
260 "type": "object",
261 "properties": {
262 "restructured_body": { "type": "string" },
263 "changes_summary": { "type": "string" }
264 },
265 "required": ["restructured_body", "changes_summary"],
266 "additionalProperties": false
267}"#;
268
269const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2741. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2752. Typed relationships between entities with strength scores\n\n\
276Rules:\n\
277- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
278- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
279- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
280- NEVER use 'mentions' as relationship type\n\
281- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
282- Prefer fewer high-quality entities over many low-quality ones";
283
284const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
285
286const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
287
288#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
294#[serde(rename_all = "kebab-case")]
295pub enum EnrichOperation {
296 MemoryBindings,
298 EntityDescriptions,
300 BodyEnrich,
302 ReEmbed,
304 WeightCalibrate,
306 RelationReclassify,
308 EntityConnect,
310 EntityTypeValidate,
312 DescriptionEnrich,
314 CrossDomainBridges,
316 DomainClassify,
318 GraphAudit,
320 DeepResearchSynth,
322 BodyExtract,
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
328pub enum EnrichMode {
329 ClaudeCode,
331 Codex,
333}
334
335impl std::fmt::Display for EnrichMode {
336 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337 match self {
338 EnrichMode::ClaudeCode => write!(f, "claude-code"),
339 EnrichMode::Codex => write!(f, "codex"),
340 }
341 }
342}
343
344#[derive(clap::Args)]
346#[command(
347 about = "Enrich graph memories and entities using an LLM provider",
348 after_long_help = "EXAMPLES:\n \
349 # Add missing entity bindings to all unbound memories\n \
350 sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n \
351 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
352 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
353 # Expand short memory bodies (GAP-18)\n \
354 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
355 # Rebuild only missing memory embeddings without rewriting bodies\n \
356 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
357 # Resume an interrupted body-enrich run\n \
358 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
359 # Retry only failed items from a previous run\n \
360 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
361 EXIT CODES:\n \
362 0 success\n \
363 1 validation error (bad args, binary not found)\n \
364 14 I/O error"
365)]
366pub struct EnrichArgs {
367 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
369 pub operation: EnrichOperation,
370
371 #[arg(long, value_enum, default_value = "claude-code")]
373 pub mode: EnrichMode,
374
375 #[arg(long, value_name = "N")]
377 pub limit: Option<usize>,
378
379 #[arg(long)]
381 pub dry_run: bool,
382
383 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
385 pub namespace: Option<String>,
386
387 #[arg(long, value_name = "PATH")]
390 pub claude_binary: Option<PathBuf>,
391
392 #[arg(long, value_name = "MODEL")]
394 pub claude_model: Option<String>,
395
396 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
398 pub claude_timeout: u64,
399
400 #[arg(long, value_name = "PATH")]
403 pub codex_binary: Option<PathBuf>,
404
405 #[arg(long, value_name = "MODEL")]
407 pub codex_model: Option<String>,
408
409 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
411 pub codex_timeout: u64,
412
413 #[arg(long, value_name = "USD")]
416 pub max_cost_usd: Option<f64>,
417
418 #[arg(long)]
421 pub resume: bool,
422
423 #[arg(long)]
425 pub retry_failed: bool,
426
427 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
430 pub min_output_chars: usize,
431
432 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
434 pub max_output_chars: usize,
435
436 #[arg(long, default_value_t = true)]
438 pub preserve_check: bool,
439
440 #[arg(long, value_name = "PATH")]
442 pub prompt_template: Option<PathBuf>,
443
444 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
448 pub llm_parallelism: u32,
449
450 #[arg(long)]
453 pub json: bool,
454
455 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
457 pub db: Option<String>,
458
459 #[arg(long, value_name = "SECONDS")]
462 pub wait_job_singleton: Option<u64>,
463
464 #[arg(long, default_value_t = false)]
468 pub force_job_singleton: bool,
469
470 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
474 pub names: Vec<String>,
475
476 #[arg(long, value_name = "PATH")]
480 pub names_file: Option<PathBuf>,
481
482 #[arg(long, default_value_t = false)]
486 pub preflight_check: bool,
487
488 #[arg(long, value_enum)]
492 pub fallback_mode: Option<EnrichMode>,
493
494 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
497 pub rate_limit_buffer: u64,
498
499 #[arg(long, default_value_t = true)]
503 pub max_load_check: bool,
504
505 #[arg(long, value_name = "N", default_value_t = 5)]
508 pub circuit_breaker_threshold: u32,
509
510 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
517 pub preserve_threshold: f64,
518
519 #[arg(long, default_value_t = true)]
524 pub codex_model_validate: bool,
525
526 #[arg(long, value_name = "MODEL")]
531 pub codex_model_fallback: Option<String>,
532}
533
534#[derive(Debug, Serialize)]
543struct PhaseEvent<'a> {
544 phase: &'a str,
545 #[serde(skip_serializing_if = "Option::is_none")]
546 binary_path: Option<&'a str>,
547 #[serde(skip_serializing_if = "Option::is_none")]
548 version: Option<&'a str>,
549 #[serde(skip_serializing_if = "Option::is_none")]
550 items_total: Option<usize>,
551 #[serde(skip_serializing_if = "Option::is_none")]
552 items_pending: Option<usize>,
553 #[serde(skip_serializing_if = "Option::is_none")]
555 llm_parallelism: Option<u32>,
556}
557
558#[derive(Debug, Serialize)]
559struct ItemEvent<'a> {
560 item: &'a str,
562 status: &'a str,
563 #[serde(skip_serializing_if = "Option::is_none")]
564 memory_id: Option<i64>,
565 #[serde(skip_serializing_if = "Option::is_none")]
566 entity_id: Option<i64>,
567 #[serde(skip_serializing_if = "Option::is_none")]
568 entities: Option<usize>,
569 #[serde(skip_serializing_if = "Option::is_none")]
570 rels: Option<usize>,
571 #[serde(skip_serializing_if = "Option::is_none")]
572 chars_before: Option<usize>,
573 #[serde(skip_serializing_if = "Option::is_none")]
574 chars_after: Option<usize>,
575 #[serde(skip_serializing_if = "Option::is_none")]
576 cost_usd: Option<f64>,
577 #[serde(skip_serializing_if = "Option::is_none")]
578 elapsed_ms: Option<u64>,
579 #[serde(skip_serializing_if = "Option::is_none")]
580 error: Option<String>,
581 index: usize,
582 total: usize,
583}
584
585#[derive(Debug, Serialize)]
586struct EnrichSummary {
587 summary: bool,
588 operation: String,
589 items_total: usize,
590 completed: usize,
591 failed: usize,
592 skipped: usize,
593 cost_usd: f64,
594 elapsed_ms: u64,
595}
596
597use crate::output::emit_json_line as emit_json;
598
599fn open_queue_db(path: &str) -> Result<Connection, AppError> {
614 let conn = Connection::open(path)?;
615 conn.pragma_update(None, "journal_mode", "wal")?;
616 conn.execute_batch(
617 "CREATE TABLE IF NOT EXISTS queue (
618 id INTEGER PRIMARY KEY AUTOINCREMENT,
619 item_key TEXT NOT NULL UNIQUE,
620 item_type TEXT NOT NULL DEFAULT 'memory',
621 status TEXT NOT NULL DEFAULT 'pending',
622 memory_id INTEGER,
623 entity_id INTEGER,
624 entities INTEGER DEFAULT 0,
625 rels INTEGER DEFAULT 0,
626 error TEXT,
627 cost_usd REAL DEFAULT 0.0,
628 attempt INTEGER DEFAULT 0,
629 elapsed_ms INTEGER,
630 created_at TEXT DEFAULT (datetime('now')),
631 done_at TEXT
632 );
633 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
634 )?;
635 Ok(conn)
636}
637
638fn call_claude(
646 binary: &Path,
647 prompt: &str,
648 json_schema: &str,
649 input_text: &str,
650 model: Option<&str>,
651 timeout_secs: u64,
652) -> Result<(serde_json::Value, f64, bool), AppError> {
653 let result = crate::commands::claude_runner::run_claude(
654 binary,
655 prompt,
656 json_schema,
657 input_text,
658 model,
659 timeout_secs,
660 7,
661 )?;
662 Ok((result.value, result.cost_usd, result.is_oauth))
663}
664
665enum PreflightOutcome {
671 Healthy,
673 RateLimited {
677 reason: String,
678 suggestion: &'static str,
679 },
680 Error(AppError),
682}
683
684fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
692 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
693
694 match args.mode {
695 EnrichMode::ClaudeCode => {
696 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
697 Ok(b) => b,
698 Err(e) => return PreflightOutcome::Error(e),
699 };
700 let mut cmd = std::process::Command::new(&bin);
701 cmd.env_clear();
702 for var in &["PATH", "HOME", "USER"] {
703 if let Ok(val) = std::env::var(var) {
704 cmd.env(var, val);
705 }
706 }
707 cmd.arg("-p")
708 .arg("ping")
709 .arg("--max-turns")
710 .arg("1")
711 .arg("--strict-mcp-config")
712 .arg("--mcp-config")
713 .arg("{}")
714 .arg("--dangerously-skip-permissions")
715 .arg("--settings")
716 .arg("{\"hooks\":{}}")
717 .arg("--output-format")
718 .arg("json")
719 .stdin(std::process::Stdio::null())
720 .stdout(std::process::Stdio::piped())
721 .stderr(std::process::Stdio::piped());
722
723 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
724 Ok(c) => c,
725 Err(e) => {
726 return PreflightOutcome::Error(AppError::Io(e));
727 }
728 };
729 let output = match wait_with_timeout(child, timeout) {
730 Ok(out) => out,
731 Err(e) => return PreflightOutcome::Error(e),
732 };
733 if !output.status.success() {
734 let stderr = String::from_utf8_lossy(&output.stderr);
735 if stderr.contains("hit your session limit")
736 || stderr.contains("rate_limit")
737 || stderr.contains("429")
738 {
739 return PreflightOutcome::RateLimited {
740 reason: stderr.trim().to_string(),
741 suggestion:
742 "wait for the OAuth window to reset or use --fallback-mode codex",
743 };
744 }
745 return PreflightOutcome::Error(AppError::Validation(format!(
746 "preflight probe failed: {stderr}",
747 stderr = stderr.trim()
748 )));
749 }
750 PreflightOutcome::Healthy
751 }
752 EnrichMode::Codex => {
753 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
754 Ok(b) => b,
755 Err(e) => return PreflightOutcome::Error(e),
756 };
757 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
758 .map_err(PreflightOutcome::Error)
759 .ok();
760 let schema = "{}";
761 let schema_path = match super::codex_spawn::trusted_schema_path() {
762 Ok(p) => p,
763 Err(e) => return PreflightOutcome::Error(e),
764 };
765 let spawn_args = super::codex_spawn::CodexSpawnArgs {
766 binary: &bin,
767 prompt: "ping",
768 json_schema: schema,
769 input_text: "",
770 model: args.codex_model.as_deref(),
771 timeout_secs: args.rate_limit_buffer.max(60),
772 schema_path: schema_path.clone(),
773 };
774 let mut cmd = super::codex_spawn::build_codex_command(&spawn_args);
775 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
776 Ok(c) => c,
777 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
778 };
779 let output = match wait_with_timeout(child, timeout) {
780 Ok(out) => out,
781 Err(e) => return PreflightOutcome::Error(e),
782 };
783 let _ = std::fs::remove_file(&schema_path);
784 if !output.status.success() {
785 let stderr = String::from_utf8_lossy(&output.stderr);
786 if stderr.contains("rate_limit")
787 || stderr.contains("429")
788 || stderr.contains("Too Many Requests")
789 {
790 return PreflightOutcome::RateLimited {
791 reason: stderr.trim().to_string(),
792 suggestion: "wait for the rate-limit window to reset",
793 };
794 }
795 return PreflightOutcome::Error(AppError::Validation(format!(
796 "preflight probe failed: {stderr}",
797 stderr = stderr.trim()
798 )));
799 }
800 PreflightOutcome::Healthy
801 }
802 }
803}
804
805fn wait_with_timeout(
807 mut child: std::process::Child,
808 timeout: std::time::Duration,
809) -> Result<std::process::Output, AppError> {
810 use wait_timeout::ChildExt;
811 let start = std::time::Instant::now();
812 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
813 if status.is_none() {
814 let _ = child.kill();
815 let _ = child.wait();
816 return Err(AppError::Validation(format!(
817 "preflight probe timed out after {}s",
818 start.elapsed().as_secs()
819 )));
820 }
821 let mut stdout = Vec::new();
822 if let Some(mut out) = child.stdout.take() {
823 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
824 }
825 let mut stderr = Vec::new();
826 if let Some(mut err) = child.stderr.take() {
827 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
828 }
829 let exit = status.unwrap();
830 Ok(std::process::Output {
831 status: exit,
832 stdout,
833 stderr,
834 })
835}
836
837fn scan_unbound_memories(
848 conn: &Connection,
849 namespace: &str,
850 limit: Option<usize>,
851 name_filter: &[String],
852) -> Result<Vec<(i64, String, String)>, AppError> {
853 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
854
855 if name_filter.is_empty() {
856 let sql = format!(
857 "SELECT m.id, m.name, m.body
858 FROM memories m
859 WHERE m.namespace = ?1
860 AND m.deleted_at IS NULL
861 AND NOT EXISTS (
862 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
863 )
864 ORDER BY m.id
865 {limit_clause}"
866 );
867 let mut stmt = conn.prepare(&sql)?;
868 let rows = stmt
869 .query_map(rusqlite::params![namespace], |r| {
870 Ok((
871 r.get::<_, i64>(0)?,
872 r.get::<_, String>(1)?,
873 r.get::<_, String>(2)?,
874 ))
875 })?
876 .collect::<Result<Vec<_>, _>>()?;
877 Ok(rows)
878 } else {
879 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
881 .map(|i| format!("?{i}"))
882 .collect();
883 let in_clause = placeholders.join(", ");
884 let sql = format!(
885 "SELECT m.id, m.name, m.body
886 FROM memories m
887 WHERE m.namespace = ?1
888 AND m.deleted_at IS NULL
889 AND m.name IN ({in_clause})
890 AND NOT EXISTS (
891 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
892 )
893 ORDER BY m.id
894 {limit_clause}"
895 );
896 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
897 params_vec.push(&namespace);
898 for n in name_filter {
899 params_vec.push(n);
900 }
901 let mut stmt = conn.prepare(&sql)?;
902 let rows = stmt
903 .query_map(
904 rusqlite::params_from_iter(params_vec.iter().copied()),
905 |r| {
906 Ok((
907 r.get::<_, i64>(0)?,
908 r.get::<_, String>(1)?,
909 r.get::<_, String>(2)?,
910 ))
911 },
912 )?
913 .collect::<Result<Vec<_>, _>>()?;
914 Ok(rows)
915 }
916}
917
918fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
923 let content = std::fs::read_to_string(path).map_err(|e| {
924 AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
925 })?;
926 let mut seen = std::collections::HashSet::new();
927 let mut out = Vec::new();
928 for line in content.lines() {
929 let trimmed = line.trim();
930 if trimmed.is_empty() || trimmed.starts_with('#') {
931 continue;
932 }
933 if seen.insert(trimmed.to_string()) {
934 out.push(trimmed.to_string());
935 }
936 }
937 Ok(out)
938}
939
940fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
942 let mut combined: Vec<String> = args.names.clone();
943 if let Some(p) = &args.names_file {
944 let from_file = read_names_file(p)?;
945 for n in from_file {
946 if !combined.contains(&n) {
947 combined.push(n);
948 }
949 }
950 }
951 Ok(combined)
952}
953
954fn scan_entities_without_description(
958 conn: &Connection,
959 namespace: &str,
960 limit: Option<usize>,
961) -> Result<Vec<(i64, String, String)>, AppError> {
962 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
963 let sql = format!(
964 "SELECT id, name, type
965 FROM entities
966 WHERE namespace = ?1
967 AND (description IS NULL OR description = '')
968 ORDER BY id
969 {limit_clause}"
970 );
971 let mut stmt = conn.prepare(&sql)?;
972 let rows = stmt
973 .query_map(rusqlite::params![namespace], |r| {
974 Ok((
975 r.get::<_, i64>(0)?,
976 r.get::<_, String>(1)?,
977 r.get::<_, String>(2)?,
978 ))
979 })?
980 .collect::<Result<Vec<_>, _>>()?;
981 Ok(rows)
982}
983
984fn scan_short_body_memories(
988 conn: &Connection,
989 namespace: &str,
990 min_chars: usize,
991 limit: Option<usize>,
992) -> Result<Vec<(i64, String, String)>, AppError> {
993 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
994 let sql = format!(
995 "SELECT m.id, m.name, m.body
996 FROM memories m
997 WHERE m.namespace = ?1
998 AND m.deleted_at IS NULL
999 AND LENGTH(COALESCE(m.body,'')) < ?2
1000 ORDER BY m.id
1001 {limit_clause}"
1002 );
1003 let mut stmt = conn.prepare(&sql)?;
1004 let rows = stmt
1005 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1006 Ok((
1007 r.get::<_, i64>(0)?,
1008 r.get::<_, String>(1)?,
1009 r.get::<_, String>(2)?,
1010 ))
1011 })?
1012 .collect::<Result<Vec<_>, _>>()?;
1013 Ok(rows)
1014}
1015
1016fn scan_memories_without_embeddings(
1020 conn: &Connection,
1021 namespace: &str,
1022 limit: Option<usize>,
1023 name_filter: &[String],
1024) -> Result<Vec<(i64, String, String)>, AppError> {
1025 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1026
1027 if name_filter.is_empty() {
1028 let sql = format!(
1029 "SELECT m.id, m.name, COALESCE(m.body,'')
1030 FROM memories m
1031 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1032 WHERE m.namespace = ?1
1033 AND m.deleted_at IS NULL
1034 AND me.memory_id IS NULL
1035 ORDER BY m.id
1036 {limit_clause}"
1037 );
1038 let mut stmt = conn.prepare(&sql)?;
1039 let rows = stmt
1040 .query_map(rusqlite::params![namespace], |r| {
1041 Ok((
1042 r.get::<_, i64>(0)?,
1043 r.get::<_, String>(1)?,
1044 r.get::<_, String>(2)?,
1045 ))
1046 })?
1047 .collect::<Result<Vec<_>, _>>()?;
1048 Ok(rows)
1049 } else {
1050 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1051 .map(|i| format!("?{i}"))
1052 .collect();
1053 let in_clause = placeholders.join(", ");
1054 let sql = format!(
1055 "SELECT m.id, m.name, COALESCE(m.body,'')
1056 FROM memories m
1057 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1058 WHERE m.namespace = ?1
1059 AND m.deleted_at IS NULL
1060 AND m.name IN ({in_clause})
1061 AND me.memory_id IS NULL
1062 ORDER BY m.id
1063 {limit_clause}"
1064 );
1065 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1066 params_vec.push(&namespace);
1067 for n in name_filter {
1068 params_vec.push(n);
1069 }
1070 let mut stmt = conn.prepare(&sql)?;
1071 let rows = stmt
1072 .query_map(
1073 rusqlite::params_from_iter(params_vec.iter().copied()),
1074 |r| {
1075 Ok((
1076 r.get::<_, i64>(0)?,
1077 r.get::<_, String>(1)?,
1078 r.get::<_, String>(2)?,
1079 ))
1080 },
1081 )?
1082 .collect::<Result<Vec<_>, _>>()?;
1083 Ok(rows)
1084 }
1085}
1086
1087#[allow(clippy::type_complexity)]
1089fn scan_weight_candidates(
1090 conn: &Connection,
1091 namespace: &str,
1092 limit: Option<usize>,
1093) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1094 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1095 let sql = format!(
1096 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1097 FROM relationships r \
1098 JOIN entities e1 ON e1.id = r.source_id \
1099 JOIN entities e2 ON e2.id = r.target_id \
1100 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1101 ORDER BY r.weight DESC {limit_clause}"
1102 );
1103 let mut stmt = conn.prepare(&sql)?;
1104 let rows = stmt
1105 .query_map(rusqlite::params![namespace], |r| {
1106 Ok((
1107 r.get::<_, i64>(0)?,
1108 r.get::<_, String>(1)?,
1109 r.get::<_, String>(2)?,
1110 r.get::<_, String>(3)?,
1111 r.get::<_, f64>(4)?,
1112 ))
1113 })?
1114 .collect::<Result<Vec<_>, _>>()?;
1115 Ok(rows)
1116}
1117
1118fn scan_generic_relations(
1120 conn: &Connection,
1121 namespace: &str,
1122 limit: Option<usize>,
1123) -> Result<Vec<(i64, String, String, String)>, AppError> {
1124 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1125 let sql = format!(
1126 "SELECT r.id, e1.name, e2.name, r.relation \
1127 FROM relationships r \
1128 JOIN entities e1 ON e1.id = r.source_id \
1129 JOIN entities e2 ON e2.id = r.target_id \
1130 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1131 ORDER BY r.id {limit_clause}"
1132 );
1133 let mut stmt = conn.prepare(&sql)?;
1134 let rows = stmt
1135 .query_map(rusqlite::params![namespace], |r| {
1136 Ok((
1137 r.get::<_, i64>(0)?,
1138 r.get::<_, String>(1)?,
1139 r.get::<_, String>(2)?,
1140 r.get::<_, String>(3)?,
1141 ))
1142 })?
1143 .collect::<Result<Vec<_>, _>>()?;
1144 Ok(rows)
1145}
1146
1147fn persist_memory_bindings(
1156 conn: &Connection,
1157 namespace: &str,
1158 memory_id: i64,
1159 entities_json: &serde_json::Value,
1160 rels_json: &serde_json::Value,
1161) -> Result<(usize, usize), AppError> {
1162 #[derive(Deserialize)]
1163 struct EntityItem {
1164 name: String,
1165 entity_type: String,
1166 }
1167 #[derive(Deserialize)]
1168 struct RelItem {
1169 source: String,
1170 target: String,
1171 relation: String,
1172 strength: f64,
1173 }
1174
1175 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1176 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1177
1178 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1179 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1180
1181 let mut ent_count = 0usize;
1182 let mut rel_count = 0usize;
1183
1184 for item in &extracted_entities {
1185 let entity_type = match item.entity_type.parse::<EntityType>() {
1186 Ok(et) => et,
1187 Err(_) => {
1188 tracing::warn!(
1189 target: "enrich",
1190 entity = %item.name,
1191 entity_type = %item.entity_type,
1192 "entity type not recognized, skipping"
1193 );
1194 continue;
1195 }
1196 };
1197 match entities::upsert_entity(
1198 conn,
1199 namespace,
1200 &NewEntity {
1201 name: item.name.clone(),
1202 entity_type,
1203 description: None,
1204 },
1205 ) {
1206 Ok(eid) => {
1207 let _ = entities::link_memory_entity(conn, memory_id, eid);
1208 ent_count += 1;
1209 }
1210 Err(e) => {
1211 tracing::warn!(
1212 target: "enrich",
1213 entity = %item.name,
1214 error = %e,
1215 "entity upsert skipped"
1216 );
1217 }
1218 }
1219 }
1220
1221 for rel in &extracted_rels {
1222 let normalized = crate::parsers::normalize_relation(&rel.relation);
1223 crate::parsers::warn_if_non_canonical(&normalized);
1224
1225 let src_name = crate::parsers::normalize_entity_name(&rel.source);
1228 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1229 let src_id = entities::find_entity_id(conn, namespace, &src_name);
1230 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1231 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1232 let new_rel = NewRelationship {
1233 source: rel.source.clone(),
1234 target: rel.target.clone(),
1235 relation: normalized,
1236 strength: rel.strength,
1237 description: None,
1238 };
1239 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1240 rel_count += 1;
1241 }
1242 }
1243 }
1244
1245 Ok((ent_count, rel_count))
1246}
1247
1248fn persist_entity_description(
1250 conn: &Connection,
1251 entity_id: i64,
1252 description: &str,
1253) -> Result<(), AppError> {
1254 conn.execute(
1255 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1256 rusqlite::params![description, entity_id],
1257 )?;
1258 Ok(())
1259}
1260
1261#[allow(clippy::too_many_arguments)]
1262fn reembed_memory_vector(
1263 conn: &Connection,
1264 namespace: &str,
1265 memory_id: i64,
1266 memory_name: &str,
1267 memory_type: &str,
1268 body: &str,
1269 paths: &crate::paths::AppPaths,
1270 llm_backend: crate::cli::LlmBackendChoice,
1271) -> Result<(), AppError> {
1272 let snippet: String = body.chars().take(200).collect();
1273 let embedding =
1275 crate::embedder::embed_passage_with_choice(&paths.models, body, Some(llm_backend))?;
1276 memories::upsert_vec(
1277 conn,
1278 memory_id,
1279 namespace,
1280 memory_type,
1281 &embedding,
1282 memory_name,
1283 &snippet,
1284 )?;
1285 Ok(())
1286}
1287
1288fn persist_enriched_body(
1293 conn: &Connection,
1294 namespace: &str,
1295 memory_id: i64,
1296 memory_name: &str,
1297 new_body: &str,
1298 paths: &crate::paths::AppPaths,
1299 llm_backend: crate::cli::LlmBackendChoice,
1300) -> Result<(), AppError> {
1301 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1303 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1304 rusqlite::params![memory_id],
1305 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1306 )?;
1307
1308 let memory_type: String = conn.query_row(
1309 "SELECT type FROM memories WHERE id=?1",
1310 rusqlite::params![memory_id],
1311 |r| r.get(0),
1312 )?;
1313
1314 let description: String = conn.query_row(
1315 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1316 rusqlite::params![memory_id],
1317 |r| r.get(0),
1318 )?;
1319
1320 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1321
1322 let new_memory = memories::NewMemory {
1323 namespace: namespace.to_string(),
1324 name: memory_name.to_string(),
1325 memory_type: memory_type.clone(),
1326 description: description.clone(),
1327 body: new_body.to_string(),
1328 body_hash,
1329 session_id: None,
1330 source: "agent".to_string(),
1331 metadata: serde_json::json!({
1332 "operation": "body-enrich",
1333 "orig_chars": old_body.chars().count(),
1334 "new_chars": new_body.chars().count(),
1335 }),
1336 };
1337
1338 let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1342 let version_metadata = serde_json::json!({
1343 "operation": "body-enrich",
1344 "orig_chars": old_body.chars().count(),
1345 "new_chars": new_body.chars().count(),
1346 })
1347 .to_string();
1348 crate::storage::versions::insert_version(
1349 conn,
1350 memory_id,
1351 next_version,
1352 memory_name,
1353 &memory_type,
1354 &description,
1355 new_body,
1356 &version_metadata,
1357 Some("enrich"),
1358 "edit",
1359 )?;
1360
1361 memories::update(conn, memory_id, &new_memory, None)?;
1362 memories::sync_fts_after_update(
1363 conn,
1364 memory_id,
1365 &old_name,
1366 &old_desc,
1367 &old_body,
1368 &new_memory.name,
1369 &new_memory.description,
1370 &new_memory.body,
1371 )?;
1372
1373 if let Err(e) = reembed_memory_vector(
1375 conn,
1376 namespace,
1377 memory_id,
1378 memory_name,
1379 &memory_type,
1380 new_body,
1381 paths,
1382 llm_backend,
1383 ) {
1384 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1385 }
1386
1387 Ok(())
1388}
1389
1390fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1402 value == default
1403}
1404
1405fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1420 const DEFAULT_TIMEOUT: u64 = 300;
1421
1422 let mut conflicts: Vec<String> = Vec::new();
1423
1424 match args.mode {
1425 EnrichMode::ClaudeCode => {
1426 if args.codex_binary.is_some() {
1427 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1428 }
1429 if args.codex_model.is_some() {
1430 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1431 }
1432 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1433 conflicts.push(format!(
1434 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1435 args.codex_timeout
1436 ));
1437 }
1438 }
1439 EnrichMode::Codex => {
1440 if args.claude_binary.is_some() {
1441 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1442 }
1443 if args.claude_model.is_some() {
1444 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1445 }
1446 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1447 conflicts.push(format!(
1448 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1449 args.claude_timeout
1450 ));
1451 }
1452 if args.max_cost_usd.is_some() {
1453 conflicts.push(
1454 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1455 .to_string(),
1456 );
1457 }
1458 }
1459 }
1460
1461 if !conflicts.is_empty() {
1462 return Err(AppError::Validation(format!(
1463 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1464 args.mode,
1465 conflicts.join("\n - ")
1466 )));
1467 }
1468
1469 Ok(())
1470}
1471
1472pub fn run(args: &EnrichArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1476 validate_mode_conditional_flags_enrich(args)?;
1479 let started = Instant::now();
1480
1481 let paths = AppPaths::resolve(args.db.as_deref())?;
1482 ensure_db_ready(&paths)?;
1483 let conn = open_rw(&paths.db)?;
1484 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1485
1486 let wait_secs = args.wait_job_singleton;
1492 let force_flag = args.force_job_singleton;
1493 let _singleton = crate::lock::acquire_job_singleton(
1494 crate::lock::JobType::Enrich,
1495 &namespace,
1496 &paths.db,
1497 wait_secs,
1498 force_flag,
1499 )?;
1500
1501 let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1503 None
1504 } else {
1505 Some(match args.mode {
1506 EnrichMode::ClaudeCode => {
1507 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1508 let version = super::claude_runner::validate_claude_version(&bin)?;
1509 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1510 emit_json(&PhaseEvent {
1511 phase: "validate",
1512 binary_path: bin.to_str(),
1513 version: Some(&version),
1514 items_total: None,
1515 items_pending: None,
1516 llm_parallelism: None,
1517 });
1518 bin
1519 }
1520 EnrichMode::Codex => {
1521 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1522 emit_json(&PhaseEvent {
1523 phase: "validate",
1524 binary_path: bin.to_str(),
1525 version: None,
1526 items_total: None,
1527 items_pending: None,
1528 llm_parallelism: None,
1529 });
1530 bin
1531 }
1532 })
1533 };
1534
1535 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1539 let load = crate::system_load::load_average_one();
1540 let n = crate::system_load::ncpus();
1541 return Err(AppError::Validation(format!(
1542 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1543 pass --no-max-load-check to override (not recommended)"
1544 )));
1545 }
1546
1547 if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1554 {
1555 let preflight_result = run_preflight_probe(args);
1556 match preflight_result {
1557 PreflightOutcome::Healthy => {
1558 tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1559 }
1560 PreflightOutcome::RateLimited { reason, suggestion } => {
1561 if let Some(fallback) = args.fallback_mode.clone() {
1562 if fallback != args.mode {
1563 return Err(AppError::Validation(format!(
1573 "preflight detected rate limit on {mode:?}: {reason}; \
1574 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1575 mode = args.mode
1576 )));
1577 }
1578 return Err(AppError::Validation(format!(
1579 "preflight detected rate limit on {mode:?}: {reason}; \
1580 --fallback-mode matches --mode, no recovery possible",
1581 mode = args.mode
1582 )));
1583 }
1584 return Err(AppError::Validation(format!(
1585 "preflight detected rate limit on {mode:?}: {reason}; \
1586 {suggestion}; pass --fallback-mode codex to recover",
1587 mode = args.mode
1588 )));
1589 }
1590 PreflightOutcome::Error(e) => {
1591 return Err(e);
1592 }
1593 }
1594 }
1595
1596 let scan_result = scan_operation(&conn, &namespace, args)?;
1598 let total = scan_result.len();
1599
1600 emit_json(&PhaseEvent {
1601 phase: "scan",
1602 binary_path: None,
1603 version: None,
1604 items_total: Some(total),
1605 items_pending: Some(total),
1606 llm_parallelism: Some(args.llm_parallelism),
1607 });
1608
1609 if args.dry_run {
1611 for (idx, key) in scan_result.iter().enumerate() {
1612 emit_json(&ItemEvent {
1613 item: key,
1614 status: "preview",
1615 memory_id: None,
1616 entity_id: None,
1617 entities: None,
1618 rels: None,
1619 chars_before: None,
1620 chars_after: None,
1621 cost_usd: None,
1622 elapsed_ms: None,
1623 error: None,
1624 index: idx,
1625 total,
1626 });
1627 }
1628 emit_json(&EnrichSummary {
1629 summary: true,
1630 operation: format!("{:?}", args.operation),
1631 items_total: total,
1632 completed: 0,
1633 failed: 0,
1634 skipped: 0,
1635 cost_usd: 0.0,
1636 elapsed_ms: started.elapsed().as_millis() as u64,
1637 });
1638 return Ok(());
1639 }
1640
1641 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1645
1646 if args.resume {
1647 let reset = queue_conn
1648 .execute(
1649 "UPDATE queue SET status='pending' WHERE status='processing'",
1650 [],
1651 )
1652 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1653 if reset > 0 {
1654 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1655 }
1656 }
1657
1658 if args.retry_failed {
1659 let count = queue_conn
1660 .execute(
1661 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1662 [],
1663 )
1664 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1665 tracing::info!(target: "enrich", count, "retrying failed items");
1666 }
1667
1668 if !args.resume && !args.retry_failed {
1669 queue_conn
1670 .execute("DELETE FROM queue", [])
1671 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1672 }
1673
1674 for (idx, key) in scan_result.iter().enumerate() {
1676 let item_type = match args.operation {
1677 EnrichOperation::EntityDescriptions => "entity",
1678 _ => "memory",
1679 };
1680 if let Err(e) = queue_conn.execute(
1681 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1682 rusqlite::params![key, item_type],
1683 ) {
1684 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1685 }
1686 let _ = idx; }
1688
1689 let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1692 if parallelism > 1 {
1693 tracing::info!(
1694 target: "enrich",
1695 llm_parallelism = parallelism,
1696 "parallel LLM processing with bounded thread pool"
1697 );
1698 }
1699 if parallelism > 4 {
1703 match args.mode {
1704 EnrichMode::ClaudeCode => {
1705 tracing::warn!(
1706 target: "enrich",
1707 llm_parallelism = parallelism,
1708 recommended_max = 4,
1709 mode = "claude-code",
1710 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1711 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1712 to cut MCP children (G28-A)"
1713 );
1714 }
1715 EnrichMode::Codex if parallelism > 16 => {
1716 tracing::warn!(
1717 target: "enrich",
1718 llm_parallelism = parallelism,
1719 recommended_max = 16,
1720 mode = "codex",
1721 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1722 consider --llm-parallelism 8 for safer concurrency"
1723 );
1724 }
1725 EnrichMode::Codex => {
1726 }
1730 }
1731 }
1732
1733 let mut completed = 0usize;
1734 let mut failed = 0usize;
1735 let mut skipped = 0usize;
1736 let mut cost_total = 0.0f64;
1737 let mut oauth_detected = false;
1738 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1739 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1740 let enrich_started = std::time::Instant::now();
1741
1742 let provider_timeout = match args.mode {
1743 EnrichMode::ClaudeCode => args.claude_timeout,
1744 EnrichMode::Codex => args.codex_timeout,
1745 };
1746
1747 let provider_model: Option<&str> = match args.mode {
1748 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1749 EnrichMode::Codex => args.codex_model.as_deref(),
1750 };
1751
1752 if parallelism > 1 {
1756 let stdout_mu = parking_lot::Mutex::new(());
1757 let budget = args.max_cost_usd;
1758 let operation = args.operation.clone();
1759 let mode = args.mode.clone();
1760 let min_oc = args.min_output_chars;
1761 let max_oc = args.max_output_chars;
1762 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1763
1764 struct WorkerResult {
1765 completed: usize,
1766 failed: usize,
1767 skipped: usize,
1768 cost: f64,
1769 oauth: bool,
1770 }
1771
1772 let results: Vec<WorkerResult> = std::thread::scope(|s| {
1773 let handles: Vec<_> = (0..parallelism)
1774 .map(|worker_id| {
1775 let stdout_mu = &stdout_mu;
1776 let paths = &paths;
1777 let namespace = &namespace;
1778 let provider_binary = provider_binary.as_deref();
1779 let operation = &operation;
1780 let mode = &mode;
1781 let prompt_tpl = prompt_tpl.as_deref();
1782 s.spawn(move || {
1783 let w_conn = match open_rw(&paths.db) {
1784 Ok(c) => c,
1785 Err(e) => {
1786 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1787 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1788 }
1789 };
1790 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1791 Ok(c) => c,
1792 Err(e) => {
1793 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1794 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1795 }
1796 };
1797 let mut w_completed = 0usize;
1798 let mut w_failed = 0usize;
1799 let mut w_skipped = 0usize;
1800 let mut w_cost = 0.0f64;
1801 let mut w_oauth = false;
1802 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1803 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1804 let mut w_breaker = crate::retry::CircuitBreaker::new(
1810 args.circuit_breaker_threshold.max(1),
1811 std::time::Duration::from_secs(60),
1812 );
1813
1814 loop {
1815 if crate::shutdown_requested() {
1816 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1817 break;
1818 }
1819 if let Some(b) = budget {
1820 if !w_oauth && w_cost >= b {
1821 break;
1822 }
1823 }
1824 let pending: Option<(i64, String, String)> = w_queue
1825 .query_row(
1826 "UPDATE queue SET status='processing', attempt=attempt+1 \
1827 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1828 RETURNING id, item_key, item_type",
1829 [],
1830 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1831 )
1832 .ok();
1833 let (queue_id, item_key, _item_type) = match pending {
1834 Some(p) => p,
1835 None => break,
1836 };
1837 let item_started = Instant::now();
1838 let current_index = w_completed + w_failed + w_skipped;
1839
1840 let call_result = match operation {
1841 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1842 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1843 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend),
1844 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend),
1845 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1846 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1847 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1848 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1849 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1850 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1851 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1852 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1853 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1854 };
1855
1856 match call_result {
1857 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1858 if is_oauth { w_oauth = true; }
1859 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1860 let _ = w_queue.execute(
1861 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1862 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1863 );
1864 w_completed += 1;
1865 if !is_oauth { w_cost += cost; }
1866 let _ = w_breaker
1868 .record(crate::retry::AttemptOutcome::Success);
1869 let _guard = stdout_mu.lock();
1870 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1871 }
1872 Ok(EnrichItemResult::Skipped { reason }) => {
1873 w_skipped += 1;
1874 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1875 let _guard = stdout_mu.lock();
1876 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1877 }
1878 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
1879 w_skipped += 1;
1885 let reason = format!(
1886 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
1887 );
1888 let _ = w_queue.execute(
1889 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1890 rusqlite::params![reason, queue_id],
1891 );
1892 let _guard = stdout_mu.lock();
1893 emit_json(&ItemEvent {
1894 item: &item_key,
1895 status: "preservation_failed",
1896 memory_id: None,
1897 entity_id: None,
1898 entities: None,
1899 rels: None,
1900 chars_before: Some(chars_before),
1901 chars_after: Some(chars_after),
1902 cost_usd: None,
1903 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1904 error: Some(reason),
1905 index: current_index,
1906 total,
1907 });
1908 }
1909 Err(e) => {
1910 let err_str = format!("{e}");
1911 if matches!(e, AppError::RateLimited { .. }) {
1912 if crate::retry::is_kill_switch_active() {
1913 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1914 } else if std::time::Instant::now() >= w_deadline {
1915 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1916 } else {
1917 let half = w_backoff / 2;
1918 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1919 let actual_wait = half + jitter;
1920 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1921 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1922 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1923 w_backoff = (w_backoff * 2).min(900);
1924 continue;
1925 }
1926 }
1927 w_failed += 1;
1928 let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1929 let _guard = stdout_mu.lock();
1930 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1931 let breaker_opened = w_breaker
1933 .record(crate::retry::AttemptOutcome::HardFailure);
1934 if breaker_opened {
1935 tracing::error!(target: "enrich",
1936 consecutive_failures = w_breaker.consecutive_failures(),
1937 "circuit breaker opened — aborting worker"
1938 );
1939 break;
1940 }
1941 }
1942 }
1943 }
1944 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1945 })
1946 })
1947 .collect();
1948 handles
1949 .into_iter()
1950 .map(|h| {
1951 h.join().unwrap_or(WorkerResult {
1952 completed: 0,
1953 failed: 0,
1954 skipped: 0,
1955 cost: 0.0,
1956 oauth: false,
1957 })
1958 })
1959 .collect()
1960 });
1961
1962 for r in &results {
1963 completed += r.completed;
1964 failed += r.failed;
1965 skipped += r.skipped;
1966 cost_total += r.cost;
1967 if r.oauth && !oauth_detected {
1968 oauth_detected = true;
1969 }
1970 }
1971 } else {
1972 loop {
1974 if crate::shutdown_requested() {
1975 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
1976 break;
1977 }
1978
1979 if let Some(budget) = args.max_cost_usd {
1981 if !oauth_detected && cost_total >= budget {
1982 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
1983 break;
1984 }
1985 }
1986
1987 let pending: Option<(i64, String, String)> = queue_conn
1989 .query_row(
1990 "UPDATE queue SET status='processing', attempt=attempt+1 \
1991 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1992 RETURNING id, item_key, item_type",
1993 [],
1994 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1995 )
1996 .ok();
1997
1998 let (queue_id, item_key, item_type) = match pending {
1999 Some(p) => p,
2000 None => break,
2001 };
2002
2003 let item_started = Instant::now();
2004 let current_index = completed + failed + skipped;
2005
2006 let call_result = match args.operation {
2007 EnrichOperation::MemoryBindings => call_memory_bindings(
2008 &conn,
2009 &namespace,
2010 &item_key,
2011 provider_binary
2012 .as_deref()
2013 .expect("provider binary required"),
2014 provider_model,
2015 provider_timeout,
2016 &args.mode,
2017 ),
2018 EnrichOperation::EntityDescriptions => call_entity_description(
2019 &conn,
2020 &namespace,
2021 &item_key,
2022 provider_binary
2023 .as_deref()
2024 .expect("provider binary required"),
2025 provider_model,
2026 provider_timeout,
2027 &args.mode,
2028 ),
2029 EnrichOperation::BodyEnrich => call_body_enrich(
2030 &conn,
2031 &namespace,
2032 &item_key,
2033 provider_binary
2034 .as_deref()
2035 .expect("provider binary required"),
2036 provider_model,
2037 provider_timeout,
2038 &args.mode,
2039 args.min_output_chars,
2040 args.max_output_chars,
2041 args.prompt_template.as_deref(),
2042 args.preserve_threshold,
2043 &paths,
2044 llm_backend,
2045 ),
2046 EnrichOperation::ReEmbed => {
2047 call_reembed(&conn, &namespace, &item_key, &paths, llm_backend)
2048 }
2049 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2050 &conn,
2051 &namespace,
2052 &item_key,
2053 provider_binary
2054 .as_deref()
2055 .expect("provider binary required"),
2056 provider_model,
2057 provider_timeout,
2058 &args.mode,
2059 ),
2060 EnrichOperation::RelationReclassify => call_relation_reclassify(
2061 &conn,
2062 &namespace,
2063 &item_key,
2064 provider_binary
2065 .as_deref()
2066 .expect("provider binary required"),
2067 provider_model,
2068 provider_timeout,
2069 &args.mode,
2070 ),
2071 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2072 call_entity_connect(
2073 &conn,
2074 &namespace,
2075 &item_key,
2076 provider_binary
2077 .as_deref()
2078 .expect("provider binary required"),
2079 provider_model,
2080 provider_timeout,
2081 &args.mode,
2082 )
2083 }
2084 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2085 &conn,
2086 &namespace,
2087 &item_key,
2088 provider_binary
2089 .as_deref()
2090 .expect("provider binary required"),
2091 provider_model,
2092 provider_timeout,
2093 &args.mode,
2094 ),
2095 EnrichOperation::DescriptionEnrich => call_description_enrich(
2096 &conn,
2097 &namespace,
2098 &item_key,
2099 provider_binary
2100 .as_deref()
2101 .expect("provider binary required"),
2102 provider_model,
2103 provider_timeout,
2104 &args.mode,
2105 ),
2106 EnrichOperation::DomainClassify => call_domain_classify(
2107 &conn,
2108 &namespace,
2109 &item_key,
2110 provider_binary
2111 .as_deref()
2112 .expect("provider binary required"),
2113 provider_model,
2114 provider_timeout,
2115 &args.mode,
2116 ),
2117 EnrichOperation::GraphAudit => call_graph_audit(
2118 &conn,
2119 &namespace,
2120 &item_key,
2121 provider_binary
2122 .as_deref()
2123 .expect("provider binary required"),
2124 provider_model,
2125 provider_timeout,
2126 &args.mode,
2127 ),
2128 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2129 &conn,
2130 &namespace,
2131 &item_key,
2132 provider_binary
2133 .as_deref()
2134 .expect("provider binary required"),
2135 provider_model,
2136 provider_timeout,
2137 &args.mode,
2138 ),
2139 EnrichOperation::BodyExtract => call_body_extract(
2140 &conn,
2141 &namespace,
2142 &item_key,
2143 provider_binary
2144 .as_deref()
2145 .expect("provider binary required"),
2146 provider_model,
2147 provider_timeout,
2148 &args.mode,
2149 ),
2150 };
2151
2152 match call_result {
2153 Ok(EnrichItemResult::Done {
2154 memory_id,
2155 entity_id,
2156 entities,
2157 rels,
2158 chars_before,
2159 chars_after,
2160 cost,
2161 is_oauth,
2162 }) => {
2163 if is_oauth && !oauth_detected {
2164 oauth_detected = true;
2165 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2166 }
2167 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2168
2169 let persist_err: Option<String> = match args.operation {
2171 EnrichOperation::MemoryBindings => {
2172 None
2174 }
2175 EnrichOperation::EntityDescriptions => {
2176 None
2178 }
2179 EnrichOperation::BodyEnrich => {
2180 None
2182 }
2183 _ => {
2184 None
2186 }
2187 };
2188
2189 if let Err(e) = queue_conn.execute(
2190 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2191 rusqlite::params![
2192 memory_id,
2193 entity_id,
2194 entities as i64,
2195 rels as i64,
2196 cost,
2197 item_started.elapsed().as_millis() as i64,
2198 queue_id
2199 ],
2200 ) {
2201 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2202 }
2203
2204 if persist_err.is_none() {
2205 completed += 1;
2206 if !is_oauth {
2207 cost_total += cost;
2208 }
2209 emit_json(&ItemEvent {
2210 item: &item_key,
2211 status: "done",
2212 memory_id,
2213 entity_id,
2214 entities: Some(entities),
2215 rels: Some(rels),
2216 chars_before,
2217 chars_after,
2218 cost_usd: if is_oauth { None } else { Some(cost) },
2219 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2220 error: None,
2221 index: current_index,
2222 total,
2223 });
2224 } else {
2225 failed += 1;
2226 emit_json(&ItemEvent {
2227 item: &item_key,
2228 status: "failed",
2229 memory_id: None,
2230 entity_id: None,
2231 entities: None,
2232 rels: None,
2233 chars_before: None,
2234 chars_after: None,
2235 cost_usd: None,
2236 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2237 error: persist_err,
2238 index: current_index,
2239 total,
2240 });
2241 }
2242 }
2243 Ok(EnrichItemResult::Skipped { reason }) => {
2244 skipped += 1;
2245 if let Err(e) = queue_conn.execute(
2246 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2247 rusqlite::params![reason, queue_id],
2248 ) {
2249 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2250 }
2251 emit_json(&ItemEvent {
2252 item: &item_key,
2253 status: "skipped",
2254 memory_id: None,
2255 entity_id: None,
2256 entities: None,
2257 rels: None,
2258 chars_before: None,
2259 chars_after: None,
2260 cost_usd: None,
2261 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2262 error: None,
2263 index: current_index,
2264 total,
2265 });
2266 }
2267 Ok(EnrichItemResult::PreservationFailed {
2268 score,
2269 threshold,
2270 chars_before,
2271 chars_after,
2272 }) => {
2273 skipped += 1;
2280 let reason = format!(
2281 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2282 );
2283 if let Err(qe) = queue_conn.execute(
2284 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2285 rusqlite::params![reason, queue_id],
2286 ) {
2287 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2288 }
2289 emit_json(&ItemEvent {
2290 item: &item_key,
2291 status: "preservation_failed",
2292 memory_id: None,
2293 entity_id: None,
2294 entities: None,
2295 rels: None,
2296 chars_before: Some(chars_before),
2297 chars_after: Some(chars_after),
2298 cost_usd: None,
2299 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2300 error: Some(reason),
2301 index: current_index,
2302 total,
2303 });
2304 }
2305 Err(e) => {
2306 let err_str = format!("{e}");
2307 if matches!(e, AppError::RateLimited { .. }) {
2308 if crate::retry::is_kill_switch_active() {
2309 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2310 } else if std::time::Instant::now() >= rate_limit_deadline {
2311 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2312 } else {
2313 let half = backoff_secs / 2;
2314 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2315 let actual_wait = half + jitter;
2316 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2317 if let Err(qe) = queue_conn.execute(
2318 "UPDATE queue SET status='pending' WHERE id=?1",
2319 rusqlite::params![queue_id],
2320 ) {
2321 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2322 }
2323 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2324 backoff_secs = (backoff_secs * 2).min(900);
2325 continue;
2326 }
2327 }
2328
2329 failed += 1;
2330 if let Err(qe) = queue_conn.execute(
2331 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2332 rusqlite::params![err_str, queue_id],
2333 ) {
2334 tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2335 }
2336 emit_json(&ItemEvent {
2337 item: &item_key,
2338 status: "failed",
2339 memory_id: None,
2340 entity_id: None,
2341 entities: None,
2342 rels: None,
2343 chars_before: None,
2344 chars_after: None,
2345 cost_usd: None,
2346 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2347 error: Some(err_str),
2348 index: current_index,
2349 total,
2350 });
2351 }
2352 }
2353
2354 let _ = item_type; }
2356 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2359 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2360
2361 emit_json(&EnrichSummary {
2362 summary: true,
2363 operation: format!("{:?}", args.operation),
2364 items_total: total,
2365 completed,
2366 failed,
2367 skipped,
2368 cost_usd: cost_total,
2369 elapsed_ms: started.elapsed().as_millis() as u64,
2370 });
2371
2372 if failed == 0 {
2373 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2374 }
2375
2376 Ok(())
2377}
2378
2379enum EnrichItemResult {
2384 Done {
2385 memory_id: Option<i64>,
2386 entity_id: Option<i64>,
2387 entities: usize,
2388 rels: usize,
2389 chars_before: Option<usize>,
2390 chars_after: Option<usize>,
2391 cost: f64,
2392 is_oauth: bool,
2393 },
2394 Skipped {
2395 reason: String,
2396 },
2397 PreservationFailed {
2402 score: f64,
2403 threshold: f64,
2404 chars_before: usize,
2405 chars_after: usize,
2406 },
2407}
2408
2409fn call_memory_bindings(
2414 conn: &Connection,
2415 namespace: &str,
2416 memory_name: &str,
2417 binary: &Path,
2418 model: Option<&str>,
2419 timeout: u64,
2420 mode: &EnrichMode,
2421) -> Result<EnrichItemResult, AppError> {
2422 let (memory_id, body): (i64, String) = conn.query_row(
2424 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2425 rusqlite::params![namespace, memory_name],
2426 |r| Ok((r.get(0)?, r.get(1)?)),
2427 ).map_err(|e| match e {
2428 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2429 other => AppError::Database(other),
2430 })?;
2431
2432 if body.trim().is_empty() {
2433 return Ok(EnrichItemResult::Skipped {
2434 reason: "body is empty".to_string(),
2435 });
2436 }
2437
2438 let (value, cost, is_oauth) = match mode {
2439 EnrichMode::ClaudeCode => call_claude(
2440 binary,
2441 BINDINGS_PROMPT,
2442 BINDINGS_SCHEMA,
2443 &body,
2444 model,
2445 timeout,
2446 )?,
2447 EnrichMode::Codex => call_codex(
2448 binary,
2449 BINDINGS_PROMPT,
2450 BINDINGS_SCHEMA,
2451 &body,
2452 model,
2453 timeout,
2454 )?,
2455 };
2456
2457 let empty_arr = serde_json::Value::Array(vec![]);
2458 let entities_val = value.get("entities").unwrap_or(&empty_arr);
2459 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2460
2461 let (ent_count, rel_count) =
2462 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2463
2464 Ok(EnrichItemResult::Done {
2465 memory_id: Some(memory_id),
2466 entity_id: None,
2467 entities: ent_count,
2468 rels: rel_count,
2469 chars_before: None,
2470 chars_after: None,
2471 cost,
2472 is_oauth,
2473 })
2474}
2475
2476fn call_entity_description(
2477 conn: &Connection,
2478 namespace: &str,
2479 entity_name: &str,
2480 binary: &Path,
2481 model: Option<&str>,
2482 timeout: u64,
2483 mode: &EnrichMode,
2484) -> Result<EnrichItemResult, AppError> {
2485 let (entity_id, entity_type): (i64, String) = conn
2486 .query_row(
2487 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2488 rusqlite::params![namespace, entity_name],
2489 |r| Ok((r.get(0)?, r.get(1)?)),
2490 )
2491 .map_err(|e| match e {
2492 rusqlite::Error::QueryReturnedNoRows => {
2493 AppError::NotFound(format!("entity '{entity_name}' not found"))
2494 }
2495 other => AppError::Database(other),
2496 })?;
2497
2498 let prompt = format!(
2499 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2500 );
2501
2502 let (value, cost, is_oauth) = match mode {
2503 EnrichMode::ClaudeCode => call_claude(
2504 binary,
2505 &prompt,
2506 ENTITY_DESCRIPTION_SCHEMA,
2507 "",
2508 model,
2509 timeout,
2510 )?,
2511 EnrichMode::Codex => call_codex(
2512 binary,
2513 &prompt,
2514 ENTITY_DESCRIPTION_SCHEMA,
2515 "",
2516 model,
2517 timeout,
2518 )?,
2519 };
2520
2521 let description = value
2522 .get("description")
2523 .and_then(|v| v.as_str())
2524 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2525
2526 persist_entity_description(conn, entity_id, description)?;
2527
2528 Ok(EnrichItemResult::Done {
2529 memory_id: None,
2530 entity_id: Some(entity_id),
2531 entities: 0,
2532 rels: 0,
2533 chars_before: None,
2534 chars_after: None,
2535 cost,
2536 is_oauth,
2537 })
2538}
2539
2540#[allow(clippy::too_many_arguments)]
2541fn call_body_enrich(
2542 conn: &Connection,
2543 namespace: &str,
2544 memory_name: &str,
2545 binary: &Path,
2546 model: Option<&str>,
2547 timeout: u64,
2548 mode: &EnrichMode,
2549 min_output_chars: usize,
2550 max_output_chars: usize,
2551 prompt_template: Option<&Path>,
2552 preserve_threshold: f64,
2553 paths: &crate::paths::AppPaths,
2554 llm_backend: crate::cli::LlmBackendChoice,
2555) -> Result<EnrichItemResult, AppError> {
2556 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2557 .query_row(
2558 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2559 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2560 rusqlite::params![namespace, memory_name],
2561 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2562 )
2563 .map_err(|e| match e {
2564 rusqlite::Error::QueryReturnedNoRows => {
2565 AppError::NotFound(format!("memory '{memory_name}' not found"))
2566 }
2567 other => AppError::Database(other),
2568 })?;
2569
2570 let chars_before = body.chars().count();
2571
2572 let linked_entities: Vec<String> = {
2574 let mut stmt = conn.prepare_cached(
2575 "SELECT e.name FROM memory_entities me \
2576 JOIN entities e ON e.id = me.entity_id \
2577 WHERE me.memory_id = ?1 LIMIT 10",
2578 )?;
2579 let result: Vec<String> = stmt
2580 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2581 .filter_map(|r| r.ok())
2582 .collect();
2583 drop(stmt);
2584 result
2585 };
2586
2587 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2589 let file_size = std::fs::metadata(tmpl_path)
2590 .map_err(|e| {
2591 AppError::Io(std::io::Error::new(
2592 e.kind(),
2593 format!("failed to stat prompt template: {e}"),
2594 ))
2595 })?
2596 .len();
2597 if file_size > MAX_MEMORY_BODY_LEN as u64 {
2598 return Err(AppError::LimitExceeded(
2599 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2600 ));
2601 }
2602 std::fs::read_to_string(tmpl_path).map_err(|e| {
2603 AppError::Io(std::io::Error::new(
2604 e.kind(),
2605 format!("failed to read prompt template: {e}"),
2606 ))
2607 })?
2608 } else {
2609 BODY_ENRICH_PROMPT_PREFIX.to_string()
2610 };
2611
2612 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2614 let mut ctx = String::new();
2615 ctx.push_str(&format!(
2616 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2617 ));
2618 if !description.is_empty() {
2619 ctx.push_str(&format!("- Description: {description}\n"));
2620 }
2621 ctx.push_str(&format!("- Domain: {namespace}\n"));
2622 if !linked_entities.is_empty() {
2623 ctx.push_str(&format!(
2624 "- Linked entities: {}\n",
2625 linked_entities.join(", ")
2626 ));
2627 }
2628 ctx
2629 } else {
2630 String::new()
2631 };
2632
2633 let prompt = format!(
2634 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2635 );
2636
2637 let (value, cost, is_oauth) = match mode {
2639 EnrichMode::ClaudeCode => {
2640 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2641 }
2642 EnrichMode::Codex => {
2643 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2644 }
2645 };
2646
2647 let enriched_body = value
2648 .get("enriched_body")
2649 .and_then(|v| v.as_str())
2650 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2651
2652 let chars_after = enriched_body.chars().count();
2653
2654 let threshold = preserve_threshold;
2661 let verdict =
2662 crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2663 if !verdict.is_accepted() {
2664 return Ok(EnrichItemResult::PreservationFailed {
2665 score: match verdict {
2666 crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2667 crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2668 crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2669 },
2670 threshold,
2671 chars_before,
2672 chars_after,
2673 });
2674 }
2675
2676 let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2682 let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2683 if old_hash == new_hash {
2684 return Ok(EnrichItemResult::Skipped {
2685 reason: format!(
2686 "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2687 ),
2688 });
2689 }
2690
2691 if chars_after <= chars_before {
2693 return Ok(EnrichItemResult::Skipped {
2694 reason: format!(
2695 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2696 ),
2697 });
2698 }
2699
2700 persist_enriched_body(
2701 conn,
2702 namespace,
2703 memory_id,
2704 memory_name,
2705 enriched_body,
2706 paths,
2707 llm_backend,
2708 )?;
2709
2710 Ok(EnrichItemResult::Done {
2711 memory_id: Some(memory_id),
2712 entity_id: None,
2713 entities: 0,
2714 rels: 0,
2715 chars_before: Some(chars_before),
2716 chars_after: Some(chars_after),
2717 cost,
2718 is_oauth,
2719 })
2720}
2721
2722fn call_reembed(
2723 conn: &Connection,
2724 namespace: &str,
2725 memory_name: &str,
2726 paths: &crate::paths::AppPaths,
2727 llm_backend: crate::cli::LlmBackendChoice,
2728) -> Result<EnrichItemResult, AppError> {
2729 let (memory_id, body, memory_type): (i64, String, String) = conn
2730 .query_row(
2731 "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2732 FROM memories
2733 WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2734 rusqlite::params![namespace, memory_name],
2735 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2736 )
2737 .map_err(|e| match e {
2738 rusqlite::Error::QueryReturnedNoRows => {
2739 AppError::NotFound(format!("memory '{memory_name}' not found"))
2740 }
2741 other => AppError::Database(other),
2742 })?;
2743
2744 if body.trim().is_empty() {
2745 return Ok(EnrichItemResult::Skipped {
2746 reason: "body is empty".to_string(),
2747 });
2748 }
2749
2750 reembed_memory_vector(
2751 conn,
2752 namespace,
2753 memory_id,
2754 memory_name,
2755 &memory_type,
2756 &body,
2757 paths,
2758 llm_backend,
2759 )?;
2760
2761 Ok(EnrichItemResult::Done {
2762 memory_id: Some(memory_id),
2763 entity_id: None,
2764 entities: 0,
2765 rels: 0,
2766 chars_before: Some(body.chars().count()),
2767 chars_after: Some(body.chars().count()),
2768 cost: 0.0,
2769 is_oauth: true,
2770 })
2771}
2772
2773fn scan_operation(
2778 conn: &Connection,
2779 namespace: &str,
2780 args: &EnrichArgs,
2781) -> Result<Vec<String>, AppError> {
2782 let name_filter = resolve_name_filter(args)?;
2784 match args.operation {
2785 EnrichOperation::MemoryBindings => {
2786 let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
2787 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2788 }
2789 EnrichOperation::EntityDescriptions => {
2790 let rows = scan_entities_without_description(conn, namespace, args.limit)?;
2791 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2792 }
2793 EnrichOperation::BodyEnrich => {
2794 let rows =
2795 scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
2796 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2797 }
2798 EnrichOperation::ReEmbed => {
2799 let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
2800 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2801 }
2802 EnrichOperation::WeightCalibrate => {
2803 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
2804 Ok(rows
2805 .into_iter()
2806 .map(|(id, _, _, _, _)| id.to_string())
2807 .collect())
2808 }
2809 EnrichOperation::RelationReclassify => {
2810 let rows = scan_generic_relations(conn, namespace, args.limit)?;
2811 Ok(rows
2812 .into_iter()
2813 .map(|(id, _, _, _)| id.to_string())
2814 .collect())
2815 }
2816 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2817 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
2818 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
2819 }
2820 EnrichOperation::EntityTypeValidate => {
2821 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
2822 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2823 }
2824 EnrichOperation::DescriptionEnrich => {
2825 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2826 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2827 }
2828 EnrichOperation::DomainClassify
2829 | EnrichOperation::GraphAudit
2830 | EnrichOperation::DeepResearchSynth
2831 | EnrichOperation::BodyExtract => {
2832 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2833 let sql = format!(
2834 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2835 );
2836 let mut stmt = conn.prepare(&sql)?;
2837 let names = stmt
2838 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2839 .collect::<Result<Vec<_>, _>>()?;
2840 Ok(names)
2841 }
2842 }
2843}
2844
2845fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2851 if let Some(p) = explicit {
2852 if p.exists() {
2853 return Ok(p.to_path_buf());
2854 }
2855 return Err(AppError::Validation(format!(
2856 "Codex binary not found at explicit path: {}",
2857 p.display()
2858 )));
2859 }
2860
2861 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2862 let p = PathBuf::from(&env_path);
2863 if p.exists() {
2864 return Ok(p);
2865 }
2866 }
2867
2868 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2869 if let Some(path_var) = std::env::var_os("PATH") {
2870 for dir in std::env::split_paths(&path_var) {
2871 let candidate = dir.join(name);
2872 if candidate.exists() {
2873 return Ok(crate::extract::llm_embedding::resolve_real_binary(
2874 &candidate,
2875 ));
2876 }
2877 }
2878 }
2879
2880 Err(AppError::Validation(
2881 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2882 ))
2883}
2884
2885fn call_weight_calibrate(
2887 conn: &Connection,
2888 _namespace: &str,
2889 item_key: &str,
2890 binary: &Path,
2891 model: Option<&str>,
2892 timeout: u64,
2893 mode: &EnrichMode,
2894) -> Result<EnrichItemResult, AppError> {
2895 let rel_id: i64 = item_key
2896 .parse()
2897 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2898 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2899 .query_row(
2900 "SELECT e1.name, e2.name, r.relation, r.weight \
2901 FROM relationships r \
2902 JOIN entities e1 ON e1.id = r.source_id \
2903 JOIN entities e2 ON e2.id = r.target_id \
2904 WHERE r.id = ?1",
2905 rusqlite::params![rel_id],
2906 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2907 )
2908 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2909
2910 let input_text = format!(
2911 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2912 );
2913 let (value, cost, is_oauth) = match mode {
2914 EnrichMode::ClaudeCode => call_claude(
2915 binary,
2916 WEIGHT_CALIBRATE_PROMPT,
2917 WEIGHT_CALIBRATE_SCHEMA,
2918 &input_text,
2919 model,
2920 timeout,
2921 )?,
2922 EnrichMode::Codex => call_codex(
2923 binary,
2924 WEIGHT_CALIBRATE_PROMPT,
2925 WEIGHT_CALIBRATE_SCHEMA,
2926 &input_text,
2927 model,
2928 timeout,
2929 )?,
2930 };
2931
2932 let calibrated = value
2933 .get("calibrated_weight")
2934 .and_then(|v| v.as_f64())
2935 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2936
2937 conn.execute(
2938 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2939 rusqlite::params![calibrated, rel_id],
2940 )?;
2941
2942 Ok(EnrichItemResult::Done {
2943 memory_id: None,
2944 entity_id: None,
2945 entities: 0,
2946 rels: 1,
2947 chars_before: None,
2948 chars_after: None,
2949 cost,
2950 is_oauth,
2951 })
2952}
2953
2954fn call_relation_reclassify(
2956 conn: &Connection,
2957 _namespace: &str,
2958 item_key: &str,
2959 binary: &Path,
2960 model: Option<&str>,
2961 timeout: u64,
2962 mode: &EnrichMode,
2963) -> Result<EnrichItemResult, AppError> {
2964 let rel_id: i64 = item_key
2965 .parse()
2966 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2967 let (source_name, target_name, current_relation): (String, String, String) = conn
2968 .query_row(
2969 "SELECT e1.name, e2.name, r.relation \
2970 FROM relationships r \
2971 JOIN entities e1 ON e1.id = r.source_id \
2972 JOIN entities e2 ON e2.id = r.target_id \
2973 WHERE r.id = ?1",
2974 rusqlite::params![rel_id],
2975 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2976 )
2977 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2978
2979 let input_text = format!(
2980 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
2981 );
2982 let (value, cost, is_oauth) = match mode {
2983 EnrichMode::ClaudeCode => call_claude(
2984 binary,
2985 RELATION_RECLASSIFY_PROMPT,
2986 RELATION_RECLASSIFY_SCHEMA,
2987 &input_text,
2988 model,
2989 timeout,
2990 )?,
2991 EnrichMode::Codex => call_codex(
2992 binary,
2993 RELATION_RECLASSIFY_PROMPT,
2994 RELATION_RECLASSIFY_SCHEMA,
2995 &input_text,
2996 model,
2997 timeout,
2998 )?,
2999 };
3000
3001 let new_relation = value
3002 .get("relation")
3003 .and_then(|v| v.as_str())
3004 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3005 let new_strength = value
3006 .get("strength")
3007 .and_then(|v| v.as_f64())
3008 .unwrap_or(0.5);
3009
3010 conn.execute(
3011 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3012 rusqlite::params![new_relation, new_strength, rel_id],
3013 )?;
3014
3015 Ok(EnrichItemResult::Done {
3016 memory_id: None,
3017 entity_id: None,
3018 entities: 0,
3019 rels: 1,
3020 chars_before: None,
3021 chars_after: None,
3022 cost,
3023 is_oauth,
3024 })
3025}
3026
3027fn call_entity_connect(
3029 conn: &Connection,
3030 namespace: &str,
3031 item_key: &str,
3032 binary: &Path,
3033 model: Option<&str>,
3034 timeout: u64,
3035 mode: &EnrichMode,
3036) -> Result<EnrichItemResult, AppError> {
3037 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3038 let (e1_id, e1_name, e2_id, e2_name) =
3039 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3040 Some(p) => p,
3041 None => {
3042 return Ok(EnrichItemResult::Skipped {
3043 reason: "pair no longer isolated".into(),
3044 })
3045 }
3046 };
3047 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3048 let (value, cost, is_oauth) = match mode {
3049 EnrichMode::ClaudeCode => call_claude(
3050 binary,
3051 ENTITY_CONNECT_PROMPT,
3052 ENTITY_CONNECT_SCHEMA,
3053 &input_text,
3054 model,
3055 timeout,
3056 )?,
3057 EnrichMode::Codex => call_codex(
3058 binary,
3059 ENTITY_CONNECT_PROMPT,
3060 ENTITY_CONNECT_SCHEMA,
3061 &input_text,
3062 model,
3063 timeout,
3064 )?,
3065 };
3066 let relation = value
3067 .get("relation")
3068 .and_then(|v| v.as_str())
3069 .unwrap_or("none");
3070 if relation == "none" {
3071 return Ok(EnrichItemResult::Skipped {
3072 reason: "LLM determined no relationship".into(),
3073 });
3074 }
3075 let strength = value
3076 .get("strength")
3077 .and_then(|v| v.as_f64())
3078 .unwrap_or(0.5);
3079 conn.execute(
3080 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3081 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3082 )?;
3083 Ok(EnrichItemResult::Done {
3084 memory_id: None,
3085 entity_id: None,
3086 entities: 0,
3087 rels: 1,
3088 chars_before: None,
3089 chars_after: None,
3090 cost,
3091 is_oauth,
3092 })
3093}
3094
3095fn call_entity_type_validate(
3097 conn: &Connection,
3098 _namespace: &str,
3099 item_key: &str,
3100 binary: &Path,
3101 model: Option<&str>,
3102 timeout: u64,
3103 mode: &EnrichMode,
3104) -> Result<EnrichItemResult, AppError> {
3105 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3106 .query_row(
3107 "SELECT id, name, type FROM entities WHERE name = ?1",
3108 rusqlite::params![item_key],
3109 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3110 )
3111 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3112 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3113 let (value, cost, is_oauth) = match mode {
3114 EnrichMode::ClaudeCode => call_claude(
3115 binary,
3116 ENTITY_TYPE_VALIDATE_PROMPT,
3117 ENTITY_TYPE_VALIDATE_SCHEMA,
3118 &input_text,
3119 model,
3120 timeout,
3121 )?,
3122 EnrichMode::Codex => call_codex(
3123 binary,
3124 ENTITY_TYPE_VALIDATE_PROMPT,
3125 ENTITY_TYPE_VALIDATE_SCHEMA,
3126 &input_text,
3127 model,
3128 timeout,
3129 )?,
3130 };
3131 let validated_type = value
3132 .get("validated_type")
3133 .and_then(|v| v.as_str())
3134 .unwrap_or(&ent_type);
3135 let was_correct = value
3136 .get("was_correct")
3137 .and_then(|v| v.as_bool())
3138 .unwrap_or(true);
3139 if !was_correct {
3140 conn.execute(
3141 "UPDATE entities SET type = ?1 WHERE id = ?2",
3142 rusqlite::params![validated_type, ent_id],
3143 )?;
3144 }
3145 Ok(EnrichItemResult::Done {
3146 memory_id: None,
3147 entity_id: Some(ent_id),
3148 entities: 1,
3149 rels: 0,
3150 chars_before: None,
3151 chars_after: None,
3152 cost,
3153 is_oauth,
3154 })
3155}
3156
3157fn call_description_enrich(
3159 conn: &Connection,
3160 _namespace: &str,
3161 item_key: &str,
3162 binary: &Path,
3163 model: Option<&str>,
3164 timeout: u64,
3165 mode: &EnrichMode,
3166) -> Result<EnrichItemResult, AppError> {
3167 let (mem_id, body, old_desc): (i64, String, String) = conn
3168 .query_row(
3169 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3170 rusqlite::params![item_key],
3171 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3172 )
3173 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3174 let snippet: String = body.chars().take(500).collect();
3175 let input_text = format!(
3176 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3177 );
3178 let (value, cost, is_oauth) = match mode {
3179 EnrichMode::ClaudeCode => call_claude(
3180 binary,
3181 DESCRIPTION_ENRICH_PROMPT,
3182 DESCRIPTION_ENRICH_SCHEMA,
3183 &input_text,
3184 model,
3185 timeout,
3186 )?,
3187 EnrichMode::Codex => call_codex(
3188 binary,
3189 DESCRIPTION_ENRICH_PROMPT,
3190 DESCRIPTION_ENRICH_SCHEMA,
3191 &input_text,
3192 model,
3193 timeout,
3194 )?,
3195 };
3196 let new_desc = value
3197 .get("description")
3198 .and_then(|v| v.as_str())
3199 .unwrap_or(&old_desc);
3200 conn.execute(
3201 "UPDATE memories SET description = ?1 WHERE id = ?2",
3202 rusqlite::params![new_desc, mem_id],
3203 )?;
3204 Ok(EnrichItemResult::Done {
3205 memory_id: Some(mem_id),
3206 entity_id: None,
3207 entities: 0,
3208 rels: 0,
3209 chars_before: Some(old_desc.len()),
3210 chars_after: Some(new_desc.len()),
3211 cost,
3212 is_oauth,
3213 })
3214}
3215
3216fn call_domain_classify(
3218 conn: &Connection,
3219 _namespace: &str,
3220 item_key: &str,
3221 binary: &Path,
3222 model: Option<&str>,
3223 timeout: u64,
3224 mode: &EnrichMode,
3225) -> Result<EnrichItemResult, AppError> {
3226 let (mem_id, body, desc): (i64, String, String) = conn
3227 .query_row(
3228 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3229 rusqlite::params![item_key],
3230 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3231 )
3232 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3233 let snippet: String = body.chars().take(500).collect();
3234 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3235 let (value, cost, is_oauth) = match mode {
3236 EnrichMode::ClaudeCode => call_claude(
3237 binary,
3238 DOMAIN_CLASSIFY_PROMPT,
3239 DOMAIN_CLASSIFY_SCHEMA,
3240 &input_text,
3241 model,
3242 timeout,
3243 )?,
3244 EnrichMode::Codex => call_codex(
3245 binary,
3246 DOMAIN_CLASSIFY_PROMPT,
3247 DOMAIN_CLASSIFY_SCHEMA,
3248 &input_text,
3249 model,
3250 timeout,
3251 )?,
3252 };
3253 let domain = value
3254 .get("domain")
3255 .and_then(|v| v.as_str())
3256 .unwrap_or("uncategorized");
3257 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3258 conn.execute(
3259 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3260 rusqlite::params![metadata, mem_id],
3261 )?;
3262 Ok(EnrichItemResult::Done {
3263 memory_id: Some(mem_id),
3264 entity_id: None,
3265 entities: 0,
3266 rels: 0,
3267 chars_before: None,
3268 chars_after: None,
3269 cost,
3270 is_oauth,
3271 })
3272}
3273
3274fn call_graph_audit(
3276 conn: &Connection,
3277 _namespace: &str,
3278 item_key: &str,
3279 binary: &Path,
3280 model: Option<&str>,
3281 timeout: u64,
3282 mode: &EnrichMode,
3283) -> Result<EnrichItemResult, AppError> {
3284 let (mem_id, body, desc): (i64, String, String) = conn
3285 .query_row(
3286 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3287 rusqlite::params![item_key],
3288 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3289 )
3290 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3291 let snippet: String = body.chars().take(500).collect();
3292 let ent_count: i64 = conn
3293 .query_row(
3294 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3295 rusqlite::params![mem_id],
3296 |r| r.get(0),
3297 )
3298 .unwrap_or(0);
3299 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3300 let (value, cost, is_oauth) = match mode {
3301 EnrichMode::ClaudeCode => call_claude(
3302 binary,
3303 GRAPH_AUDIT_PROMPT,
3304 GRAPH_AUDIT_SCHEMA,
3305 &input_text,
3306 model,
3307 timeout,
3308 )?,
3309 EnrichMode::Codex => call_codex(
3310 binary,
3311 GRAPH_AUDIT_PROMPT,
3312 GRAPH_AUDIT_SCHEMA,
3313 &input_text,
3314 model,
3315 timeout,
3316 )?,
3317 };
3318 let issues = value
3319 .get("issues")
3320 .and_then(|v| v.as_array())
3321 .map(|a| a.len())
3322 .unwrap_or(0);
3323 Ok(EnrichItemResult::Done {
3324 memory_id: Some(mem_id),
3325 entity_id: None,
3326 entities: 0,
3327 rels: issues,
3328 chars_before: None,
3329 chars_after: None,
3330 cost,
3331 is_oauth,
3332 })
3333}
3334
3335fn call_deep_research_synth(
3337 conn: &Connection,
3338 namespace: &str,
3339 item_key: &str,
3340 binary: &Path,
3341 model: Option<&str>,
3342 timeout: u64,
3343 mode: &EnrichMode,
3344) -> Result<EnrichItemResult, AppError> {
3345 let (mem_id, body): (i64, String) = conn
3346 .query_row(
3347 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3348 rusqlite::params![item_key],
3349 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3350 )
3351 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3352 let snippet: String = body.chars().take(2000).collect();
3353 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3354 let (value, cost, is_oauth) = match mode {
3355 EnrichMode::ClaudeCode => call_claude(
3356 binary,
3357 DEEP_RESEARCH_SYNTH_PROMPT,
3358 DEEP_RESEARCH_SYNTH_SCHEMA,
3359 &input_text,
3360 model,
3361 timeout,
3362 )?,
3363 EnrichMode::Codex => call_codex(
3364 binary,
3365 DEEP_RESEARCH_SYNTH_PROMPT,
3366 DEEP_RESEARCH_SYNTH_SCHEMA,
3367 &input_text,
3368 model,
3369 timeout,
3370 )?,
3371 };
3372 let mut ent_count = 0usize;
3373 let mut rel_count = 0usize;
3374 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3375 for e in ents {
3376 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3377 let etype_str = e
3378 .get("entity_type")
3379 .and_then(|v| v.as_str())
3380 .unwrap_or("concept");
3381 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3382 if name.len() >= 2 {
3383 let ne = NewEntity {
3384 name: name.to_string(),
3385 entity_type: etype,
3386 description: None,
3387 };
3388 let _ = entities::upsert_entity(conn, namespace, &ne);
3389 ent_count += 1;
3390 }
3391 }
3392 }
3393 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3394 for r in rels {
3395 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3396 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3397 if src.is_empty() || tgt.is_empty() {
3398 continue;
3399 }
3400 let rel = r
3401 .get("relation")
3402 .and_then(|v| v.as_str())
3403 .unwrap_or("related");
3404 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3405 if let (Some(sid), Some(tid)) = (
3406 entities::find_entity_id(conn, namespace, src)?,
3407 entities::find_entity_id(conn, namespace, tgt)?,
3408 ) {
3409 let _ = entities::create_or_fetch_relationship(
3410 conn, namespace, sid, tid, rel, str_, None,
3411 );
3412 rel_count += 1;
3413 }
3414 }
3415 }
3416 Ok(EnrichItemResult::Done {
3417 memory_id: Some(mem_id),
3418 entity_id: None,
3419 entities: ent_count,
3420 rels: rel_count,
3421 chars_before: None,
3422 chars_after: None,
3423 cost,
3424 is_oauth,
3425 })
3426}
3427
3428fn call_body_extract(
3430 conn: &Connection,
3431 _namespace: &str,
3432 item_key: &str,
3433 binary: &Path,
3434 model: Option<&str>,
3435 timeout: u64,
3436 mode: &EnrichMode,
3437) -> Result<EnrichItemResult, AppError> {
3438 let (mem_id, body): (i64, String) = conn
3439 .query_row(
3440 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3441 rusqlite::params![item_key],
3442 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3443 )
3444 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3445 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3446 let (value, cost, is_oauth) = match mode {
3447 EnrichMode::ClaudeCode => call_claude(
3448 binary,
3449 BODY_EXTRACT_PROMPT,
3450 BODY_EXTRACT_SCHEMA,
3451 &input_text,
3452 model,
3453 timeout,
3454 )?,
3455 EnrichMode::Codex => call_codex(
3456 binary,
3457 BODY_EXTRACT_PROMPT,
3458 BODY_EXTRACT_SCHEMA,
3459 &input_text,
3460 model,
3461 timeout,
3462 )?,
3463 };
3464 let restructured = value
3465 .get("restructured_body")
3466 .and_then(|v| v.as_str())
3467 .unwrap_or(&body);
3468 let chars_before = body.len();
3469 let chars_after = restructured.len();
3470 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3471 conn.execute(
3472 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3473 rusqlite::params![restructured, new_hash, mem_id],
3474 )?;
3475 Ok(EnrichItemResult::Done {
3476 memory_id: Some(mem_id),
3477 entity_id: None,
3478 entities: 0,
3479 rels: 0,
3480 chars_before: Some(chars_before),
3481 chars_after: Some(chars_after),
3482 cost,
3483 is_oauth,
3484 })
3485}
3486
3487#[allow(clippy::type_complexity)]
3489fn scan_isolated_entity_pairs(
3490 conn: &Connection,
3491 namespace: &str,
3492 limit: Option<usize>,
3493) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3494 let limit_val = limit.unwrap_or(50) as i64;
3495 let mut stmt = conn.prepare_cached(
3496 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3497 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3498 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3499 (r.source_id = e1.id AND r.target_id = e2.id) OR \
3500 (r.source_id = e2.id AND r.target_id = e1.id)) \
3501 LIMIT ?2",
3502 )?;
3503 let rows = stmt
3504 .query_map(rusqlite::params![namespace, limit_val], |r| {
3505 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3506 })?
3507 .collect::<Result<Vec<_>, _>>()?;
3508 Ok(rows)
3509}
3510
3511fn scan_entities_for_type_validation(
3513 conn: &Connection,
3514 namespace: &str,
3515 limit: Option<usize>,
3516) -> Result<Vec<(i64, String, String)>, AppError> {
3517 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3518 let sql = format!(
3519 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3520 );
3521 let mut stmt = conn.prepare(&sql)?;
3522 let rows = stmt
3523 .query_map(rusqlite::params![namespace], |r| {
3524 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3525 })?
3526 .collect::<Result<Vec<_>, _>>()?;
3527 Ok(rows)
3528}
3529
3530fn scan_generic_descriptions(
3532 conn: &Connection,
3533 namespace: &str,
3534 limit: Option<usize>,
3535) -> Result<Vec<(i64, String, String)>, AppError> {
3536 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3537 let sql = format!(
3538 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3539 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3540 ORDER BY id {limit_clause}"
3541 );
3542 let mut stmt = conn.prepare(&sql)?;
3543 let rows = stmt
3544 .query_map(rusqlite::params![namespace], |r| {
3545 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3546 })?
3547 .collect::<Result<Vec<_>, _>>()?;
3548 Ok(rows)
3549}
3550
3551fn call_codex(
3555 binary: &Path,
3556 prompt: &str,
3557 json_schema: &str,
3558 input_text: &str,
3559 model: Option<&str>,
3560 timeout_secs: u64,
3561) -> Result<(serde_json::Value, f64, bool), AppError> {
3562 use wait_timeout::ChildExt;
3563
3564 super::codex_spawn::validate_codex_model(model)?;
3569 let schema_file = super::codex_spawn::trusted_schema_path()?;
3570
3571 let args = super::codex_spawn::CodexSpawnArgs {
3572 binary,
3573 prompt,
3574 json_schema,
3575 input_text,
3576 model,
3577 timeout_secs,
3578 schema_path: schema_file.clone(),
3579 };
3580 let mut cmd = super::codex_spawn::build_codex_command(&args);
3581
3582 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3583 AppError::Io(std::io::Error::new(
3584 e.kind(),
3585 format!("failed to spawn codex: {e}"),
3586 ))
3587 })?;
3588
3589 let full_prompt = format!("{prompt}\n\n{input_text}");
3590 let stdin_bytes = full_prompt.into_bytes();
3591 let mut child_stdin = child
3592 .stdin
3593 .take()
3594 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3595 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3596 child_stdin.write_all(&stdin_bytes)?;
3597 drop(child_stdin);
3598 Ok(())
3599 });
3600
3601 let start = std::time::Instant::now();
3602 let timeout = std::time::Duration::from_secs(timeout_secs);
3603 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3604 let _ = std::fs::remove_file(&schema_file);
3605
3606 match status {
3607 Some(exit_status) => {
3608 stdin_thread
3609 .join()
3610 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3611 .map_err(AppError::Io)?;
3612
3613 tracing::debug!(
3614 target: "process",
3615 exit_code = ?exit_status.code(),
3616 elapsed_ms = start.elapsed().as_millis() as u64,
3617 "external process completed"
3618 );
3619
3620 let mut stdout_buf = Vec::new();
3621 if let Some(mut out) = child.stdout.take() {
3622 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3623 }
3624 if !exit_status.success() {
3625 let mut stderr_buf = Vec::new();
3626 if let Some(mut err) = child.stderr.take() {
3627 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3628 }
3629 let stderr_str = String::from_utf8_lossy(&stderr_buf);
3630 tracing::warn!(
3631 target: "enrich",
3632 exit_code = ?exit_status.code(),
3633 stderr = %stderr_str.trim(),
3634 "codex process failed"
3635 );
3636 return Err(AppError::Validation(format!(
3637 "codex exited with code {:?}: {}",
3638 exit_status.code(),
3639 stderr_str.trim()
3640 )));
3641 }
3642 let stdout_str = String::from_utf8(stdout_buf)
3643 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3644 let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
3647 let value: serde_json::Value =
3653 serde_json::from_str(&result.last_agent_text).map_err(|e| {
3654 AppError::Validation(format!(
3655 "codex agent_message is not valid JSON: {e}; raw={}",
3656 result.last_agent_text
3657 ))
3658 })?;
3659 Ok((value, 0.0, false))
3660 }
3661 None => {
3662 let _ = child.kill();
3663 let _ = child.wait();
3664 let _ = stdin_thread.join();
3665 Err(AppError::Validation(format!(
3666 "codex timed out after {timeout_secs} seconds"
3667 )))
3668 }
3669 }
3670}
3671
3672#[cfg(test)]
3677mod tests {
3678 use super::*;
3679 use rusqlite::Connection;
3680 #[cfg(unix)]
3681 use std::os::unix::fs::PermissionsExt;
3682
3683 fn open_test_db() -> Connection {
3685 let conn = Connection::open_in_memory().expect("in-memory db");
3686 conn.execute_batch(
3687 "CREATE TABLE memories (
3688 id INTEGER PRIMARY KEY AUTOINCREMENT,
3689 namespace TEXT NOT NULL DEFAULT 'global',
3690 name TEXT NOT NULL,
3691 type TEXT NOT NULL DEFAULT 'note',
3692 description TEXT NOT NULL DEFAULT '',
3693 body TEXT NOT NULL DEFAULT '',
3694 body_hash TEXT NOT NULL DEFAULT '',
3695 session_id TEXT,
3696 source TEXT NOT NULL DEFAULT 'agent',
3697 metadata TEXT NOT NULL DEFAULT '{}',
3698 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3699 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3700 deleted_at INTEGER,
3701 UNIQUE(namespace, name)
3702 );
3703 CREATE TABLE entities (
3704 id INTEGER PRIMARY KEY AUTOINCREMENT,
3705 namespace TEXT NOT NULL DEFAULT 'global',
3706 name TEXT NOT NULL,
3707 type TEXT NOT NULL DEFAULT 'concept',
3708 description TEXT,
3709 degree INTEGER NOT NULL DEFAULT 0,
3710 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3711 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3712 UNIQUE(namespace, name)
3713 );
3714 CREATE TABLE memory_entities (
3715 memory_id INTEGER NOT NULL,
3716 entity_id INTEGER NOT NULL,
3717 PRIMARY KEY (memory_id, entity_id)
3718 );
3719 CREATE TABLE relationships (
3720 id INTEGER PRIMARY KEY AUTOINCREMENT,
3721 namespace TEXT NOT NULL DEFAULT 'global',
3722 source_id INTEGER NOT NULL,
3723 target_id INTEGER NOT NULL,
3724 relation TEXT NOT NULL,
3725 weight REAL NOT NULL DEFAULT 0.5,
3726 description TEXT,
3727 UNIQUE(source_id, target_id, relation)
3728 );
3729 CREATE TABLE memory_embeddings (
3730 memory_id INTEGER PRIMARY KEY,
3731 namespace TEXT NOT NULL,
3732 embedding BLOB NOT NULL,
3733 source TEXT NOT NULL,
3734 model TEXT NOT NULL DEFAULT '',
3735 dim INTEGER NOT NULL DEFAULT 384,
3736 created_at INTEGER NOT NULL DEFAULT (unixepoch())
3737 );",
3738 )
3739 .expect("schema creation must succeed");
3740 conn
3741 }
3742
3743 #[test]
3744 fn scan_unbound_memories_finds_memories_without_bindings() {
3745 let conn = open_test_db();
3746 conn.execute(
3747 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
3748 [],
3749 )
3750 .unwrap();
3751
3752 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3753 assert_eq!(results.len(), 1);
3754 assert_eq!(results[0].1, "test-mem");
3755 }
3756
3757 #[test]
3758 fn scan_unbound_memories_excludes_bound_memories() {
3759 let conn = open_test_db();
3760 conn.execute(
3761 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
3762 [],
3763 )
3764 .unwrap();
3765 let mem_id: i64 = conn
3766 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
3767 r.get(0)
3768 })
3769 .unwrap();
3770 conn.execute(
3771 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
3772 [],
3773 )
3774 .unwrap();
3775 let ent_id: i64 = conn
3776 .query_row(
3777 "SELECT id FROM entities WHERE name='some-entity'",
3778 [],
3779 |r| r.get(0),
3780 )
3781 .unwrap();
3782 conn.execute(
3783 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
3784 rusqlite::params![mem_id, ent_id],
3785 )
3786 .unwrap();
3787
3788 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3789 assert!(results.is_empty(), "bound memory must not appear in scan");
3790 }
3791
3792 #[test]
3793 fn scan_entities_without_description_finds_null_description() {
3794 let conn = open_test_db();
3795 conn.execute(
3796 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
3797 [],
3798 )
3799 .unwrap();
3800
3801 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3802 assert_eq!(results.len(), 1);
3803 assert_eq!(results[0].1, "my-tool");
3804 }
3805
3806 #[test]
3807 fn scan_entities_without_description_excludes_entities_with_description() {
3808 let conn = open_test_db();
3809 conn.execute(
3810 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
3811 [],
3812 )
3813 .unwrap();
3814
3815 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3816 assert!(
3817 results.is_empty(),
3818 "entity with description must not appear"
3819 );
3820 }
3821
3822 #[test]
3823 fn scan_short_body_memories_finds_short_bodies() {
3824 let conn = open_test_db();
3825 conn.execute(
3826 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3827 [],
3828 )
3829 .unwrap();
3830
3831 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3832 assert_eq!(results.len(), 1);
3833 assert_eq!(results[0].1, "short-mem");
3834 }
3835
3836 #[test]
3837 fn scan_short_body_memories_excludes_long_bodies() {
3838 let conn = open_test_db();
3839 let long_body = "a".repeat(1000);
3840 conn.execute(
3841 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3842 rusqlite::params![long_body],
3843 )
3844 .unwrap();
3845
3846 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3847 assert!(results.is_empty(), "long memory must not appear in scan");
3848 }
3849
3850 #[test]
3851 fn scan_respects_limit() {
3852 let conn = open_test_db();
3853 for i in 0..5 {
3854 conn.execute(
3855 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3856 [],
3857 )
3858 .unwrap();
3859 }
3860
3861 let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3862 assert_eq!(results.len(), 3, "limit must be respected");
3863 }
3864
3865 #[test]
3866 fn scan_memories_without_embeddings_finds_only_missing_rows() {
3867 let conn = open_test_db();
3868 conn.execute(
3869 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
3870 [],
3871 )
3872 .unwrap();
3873 conn.execute(
3874 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
3875 [],
3876 )
3877 .unwrap();
3878 let memory_id: i64 = conn
3879 .query_row(
3880 "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
3881 [],
3882 |r| r.get(0),
3883 )
3884 .unwrap();
3885 let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
3886 memories::upsert_vec(
3887 &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
3888 )
3889 .unwrap();
3890
3891 let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
3892 assert_eq!(results.len(), 1);
3893 assert_eq!(results[0].1, "missing-vec");
3894 }
3895
3896 #[test]
3897 fn scan_memories_without_embeddings_respects_name_filter() {
3898 let conn = open_test_db();
3899 conn.execute(
3900 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
3901 [],
3902 )
3903 .unwrap();
3904 conn.execute(
3905 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
3906 [],
3907 )
3908 .unwrap();
3909
3910 let results =
3911 scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
3912 .unwrap();
3913 assert_eq!(results.len(), 1);
3914 assert_eq!(results[0].1, "match-me");
3915 }
3916
3917 #[test]
3918 fn queue_db_schema_creates_correctly() {
3919 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3920 let conn = open_queue_db(&tmp_path).expect("queue db must open");
3921 let count: i64 = conn
3922 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3923 .unwrap();
3924 assert_eq!(count, 0);
3925 let _ = std::fs::remove_file(&tmp_path);
3926 }
3927
3928 #[test]
3929 fn parse_claude_output_valid_bindings() {
3930 let output = r#"[
3931 {"type":"system","subtype":"init"},
3932 {"type":"result","is_error":false,"total_cost_usd":0.01,
3933 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3934 ]"#;
3935 let result = crate::commands::claude_runner::parse_claude_output(output)
3936 .expect("must parse successfully");
3937 assert!(result.value.get("entities").is_some());
3938 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3939 assert!(!result.is_oauth);
3940 }
3941
3942 #[test]
3943 fn parse_claude_output_detects_oauth() {
3944 let output = r#"[
3945 {"type":"system","subtype":"init","apiKeySource":"none"},
3946 {"type":"result","is_error":false,"total_cost_usd":0.0,
3947 "structured_output":{"entities":[],"relationships":[]}}
3948 ]"#;
3949 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
3950 assert!(result.is_oauth);
3951 }
3952
3953 #[test]
3954 fn parse_claude_output_rate_limit_returns_error() {
3955 let output = r#"[
3956 {"type":"system","subtype":"init"},
3957 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
3958 ]"#;
3959 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3960 assert!(matches!(err, AppError::RateLimited { .. }));
3961 }
3962
3963 #[test]
3964 fn parse_claude_output_auth_error() {
3965 let output = r#"[
3966 {"type":"system","subtype":"init"},
3967 {"type":"result","is_error":true,"error":"authentication failed"}
3968 ]"#;
3969 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3970 assert!(format!("{err}").contains("authentication failed"));
3971 }
3972
3973 #[cfg(unix)]
3974 #[test]
3975 fn call_codex_returns_raw_json_for_body_enrich_schema() {
3976 let tmp = tempfile::tempdir().expect("tempdir");
3977 let binary = tmp.path().join("codex-mock");
3978 std::fs::write(
3979 &binary,
3980 r#"#!/usr/bin/env bash
3981set -euo pipefail
3982cat <<'JSONL'
3983{"type":"thread.started","thread_id":"mock-thread-0"}
3984{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
3985{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
3986JSONL
3987"#,
3988 )
3989 .expect("mock codex write");
3990 let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
3991 perms.set_mode(0o755);
3992 std::fs::set_permissions(&binary, perms).expect("chmod");
3993
3994 let (value, cost, is_oauth) =
3995 call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
3996 .expect("call_codex must accept body-enrich payload");
3997
3998 assert_eq!(value["enriched_body"], "expanded body");
3999 assert_eq!(cost, 0.0);
4000 assert!(!is_oauth);
4001 }
4002
4003 #[test]
4004 fn dry_run_emits_preview_without_calling_llm() {
4005 let conn = open_test_db();
4010 conn.execute(
4011 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4012 [],
4013 )
4014 .unwrap();
4015
4016 let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
4017 assert_eq!(results.len(), 1);
4018 assert_eq!(results[0].1, "dry-mem");
4019 }
4022
4023 #[test]
4024 fn persist_entity_description_updates_db() {
4025 let conn = open_test_db();
4026 conn.execute(
4027 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4028 [],
4029 )
4030 .unwrap();
4031 let eid: i64 = conn
4032 .query_row(
4033 "SELECT id FROM entities WHERE name='tokio-runtime'",
4034 [],
4035 |r| r.get(0),
4036 )
4037 .unwrap();
4038
4039 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4040
4041 let desc: String = conn
4042 .query_row(
4043 "SELECT description FROM entities WHERE id=?1",
4044 rusqlite::params![eid],
4045 |r| r.get(0),
4046 )
4047 .unwrap();
4048 assert_eq!(desc, "Async runtime for Rust applications");
4049 }
4050
4051 #[test]
4052 fn bindings_schema_is_valid_json() {
4053 let _: serde_json::Value =
4054 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4055 }
4056
4057 #[test]
4058 fn entity_description_schema_is_valid_json() {
4059 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4060 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4061 }
4062
4063 #[test]
4064 fn body_enrich_schema_is_valid_json() {
4065 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4066 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4067 }
4068}