1use crate::commands::ingest_claude::find_claude_binary;
25use crate::constants::MAX_MEMORY_BODY_LEN;
26use crate::entity_type::EntityType;
27use crate::errors::AppError;
28use crate::paths::AppPaths;
29use crate::storage::connection::{ensure_db_ready, open_rw};
30use crate::storage::entities::{self, NewEntity, NewRelationship};
31use crate::storage::memories;
32
33use rusqlite::Connection;
34use serde::{Deserialize, Serialize};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::time::Instant;
38
39const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
44const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
45const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
46const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
47
48const BINDINGS_SCHEMA: &str = r#"{
53 "type": "object",
54 "properties": {
55 "entities": {
56 "type": "array",
57 "items": {
58 "type": "object",
59 "properties": {
60 "name": { "type": "string" },
61 "entity_type": {
62 "type": "string",
63 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
64 }
65 },
66 "required": ["name", "entity_type"],
67 "additionalProperties": false
68 }
69 },
70 "relationships": {
71 "type": "array",
72 "items": {
73 "type": "object",
74 "properties": {
75 "source": { "type": "string" },
76 "target": { "type": "string" },
77 "relation": {
78 "type": "string",
79 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
80 },
81 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
82 },
83 "required": ["source","target","relation","strength"],
84 "additionalProperties": false
85 }
86 }
87 },
88 "required": ["entities","relationships"],
89 "additionalProperties": false
90}"#;
91
92const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
93 "type": "object",
94 "properties": {
95 "description": { "type": "string" }
96 },
97 "required": ["description"],
98 "additionalProperties": false
99}"#;
100
101const BODY_ENRICH_SCHEMA: &str = r#"{
102 "type": "object",
103 "properties": {
104 "enriched_body": { "type": "string" }
105 },
106 "required": ["enriched_body"],
107 "additionalProperties": false
108}"#;
109
110const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
112Scale:\n\
113- 0.9 = vital hard dependency (A cannot function without B)\n\
114- 0.7 = important design relationship (A strongly supports/enables B)\n\
115- 0.5 = useful contextual link (A and B share relevant context)\n\
116- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
117Respond with the calibrated weight and brief reasoning.";
118
119const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
120 "type": "object",
121 "properties": {
122 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
123 "reasoning": { "type": "string" }
124 },
125 "required": ["calibrated_weight", "reasoning"],
126 "additionalProperties": false
127}"#;
128
129const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
131Valid canonical relations (pick exactly one):\n\
132- depends-on: A cannot function without B\n\
133- uses: A utilizes B but could substitute it\n\
134- supports: A reinforces or enables B\n\
135- causes: A triggers or produces B\n\
136- fixes: A resolves a problem in B\n\
137- contradicts: A conflicts with or invalidates B\n\
138- applies-to: A is relevant to or scoped within B\n\
139- follows: A comes after B in sequence\n\
140- replaces: A substitutes B\n\
141- tracked-in: A is monitored in B\n\
142- related: A and B share context (use sparingly)\n\n\
143Respond with the correct relation, strength, and reasoning.";
144
145const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
146 "type": "object",
147 "properties": {
148 "relation": { "type": "string" },
149 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
150 "reasoning": { "type": "string" }
151 },
152 "required": ["relation", "strength", "reasoning"],
153 "additionalProperties": false
154}"#;
155
156const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
158Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
159If NO meaningful relationship exists, set relation to \"none\".\n\
160Respond with the relation (or \"none\"), strength, and reasoning.";
161
162const ENTITY_CONNECT_SCHEMA: &str = r#"{
163 "type": "object",
164 "properties": {
165 "relation": { "type": "string" },
166 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
167 "reasoning": { "type": "string" }
168 },
169 "required": ["relation", "strength", "reasoning"],
170 "additionalProperties": false
171}"#;
172
173const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
175Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
176If the current type is correct, keep it. If wrong, suggest the correct type.\n\
177Respond with the validated type and reasoning.";
178
179const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
180 "type": "object",
181 "properties": {
182 "validated_type": { "type": "string" },
183 "was_correct": { "type": "boolean" },
184 "reasoning": { "type": "string" }
185 },
186 "required": ["validated_type", "was_correct", "reasoning"],
187 "additionalProperties": false
188}"#;
189
190const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
192BAD: 'ingested from docs/auth.md'\n\
193GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
194Respond with the improved description and reasoning.";
195
196const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
197 "type": "object",
198 "properties": {
199 "description": { "type": "string" },
200 "reasoning": { "type": "string" }
201 },
202 "required": ["description", "reasoning"],
203 "additionalProperties": false
204}"#;
205
206const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
208Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
209
210const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
211 "type": "object",
212 "properties": {
213 "domain": { "type": "string" },
214 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
215 "reasoning": { "type": "string" }
216 },
217 "required": ["domain", "confidence", "reasoning"],
218 "additionalProperties": false
219}"#;
220
221const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
223Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
224Respond with a list of issues found (or empty if none) and an overall quality score.";
225
226const GRAPH_AUDIT_SCHEMA: &str = r#"{
227 "type": "object",
228 "properties": {
229 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
230 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
231 "reasoning": { "type": "string" }
232 },
233 "required": ["quality_score", "issues", "reasoning"],
234 "additionalProperties": false
235}"#;
236
237const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
239Entity names: lowercase kebab-case, domain-specific.\n\
240Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
241Respond with extracted entities, relationships, and a synthesis summary.";
242
243const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
244 "type": "object",
245 "properties": {
246 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
247 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
248 "summary": { "type": "string" }
249 },
250 "required": ["entities", "relationships", "summary"],
251 "additionalProperties": false
252}"#;
253
254const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
256Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
257Respond with the restructured body and a brief summary of changes.";
258
259const BODY_EXTRACT_SCHEMA: &str = r#"{
260 "type": "object",
261 "properties": {
262 "restructured_body": { "type": "string" },
263 "changes_summary": { "type": "string" }
264 },
265 "required": ["restructured_body", "changes_summary"],
266 "additionalProperties": false
267}"#;
268
269const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2741. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2752. Typed relationships between entities with strength scores\n\n\
276Rules:\n\
277- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
278- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
279- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
280- NEVER use 'mentions' as relationship type\n\
281- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
282- Prefer fewer high-quality entities over many low-quality ones";
283
284const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
285
286const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
287
288#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
294#[serde(rename_all = "kebab-case")]
295pub enum EnrichOperation {
296 MemoryBindings,
298 EntityDescriptions,
300 BodyEnrich,
302 ReEmbed,
304 WeightCalibrate,
306 RelationReclassify,
308 EntityConnect,
310 EntityTypeValidate,
312 DescriptionEnrich,
314 CrossDomainBridges,
316 DomainClassify,
318 GraphAudit,
320 DeepResearchSynth,
322 BodyExtract,
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
328pub enum EnrichMode {
329 ClaudeCode,
331 Codex,
333}
334
335impl std::fmt::Display for EnrichMode {
336 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337 match self {
338 EnrichMode::ClaudeCode => write!(f, "claude-code"),
339 EnrichMode::Codex => write!(f, "codex"),
340 }
341 }
342}
343
344#[derive(clap::Args)]
346#[command(
347 about = "Enrich graph memories and entities using an LLM provider",
348 after_long_help = "EXAMPLES:\n \
349 # Add missing entity bindings to all unbound memories\n \
350 sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n \
351 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
352 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
353 # Expand short memory bodies (GAP-18)\n \
354 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
355 # Rebuild only missing memory embeddings without rewriting bodies\n \
356 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
357 # Resume an interrupted body-enrich run\n \
358 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
359 # Retry only failed items from a previous run\n \
360 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
361 EXIT CODES:\n \
362 0 success\n \
363 1 validation error (bad args, binary not found)\n \
364 14 I/O error"
365)]
366pub struct EnrichArgs {
367 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
369 pub operation: EnrichOperation,
370
371 #[arg(long, value_enum, default_value = "claude-code")]
373 pub mode: EnrichMode,
374
375 #[arg(long, value_name = "N")]
377 pub limit: Option<usize>,
378
379 #[arg(long)]
381 pub dry_run: bool,
382
383 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
385 pub namespace: Option<String>,
386
387 #[arg(long, value_name = "PATH")]
390 pub claude_binary: Option<PathBuf>,
391
392 #[arg(long, value_name = "MODEL")]
394 pub claude_model: Option<String>,
395
396 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
398 pub claude_timeout: u64,
399
400 #[arg(long, value_name = "PATH")]
403 pub codex_binary: Option<PathBuf>,
404
405 #[arg(long, value_name = "MODEL")]
407 pub codex_model: Option<String>,
408
409 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
411 pub codex_timeout: u64,
412
413 #[arg(long, value_name = "USD")]
416 pub max_cost_usd: Option<f64>,
417
418 #[arg(long)]
421 pub resume: bool,
422
423 #[arg(long)]
425 pub retry_failed: bool,
426
427 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
430 pub min_output_chars: usize,
431
432 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
434 pub max_output_chars: usize,
435
436 #[arg(long, default_value_t = true)]
438 pub preserve_check: bool,
439
440 #[arg(long, value_name = "PATH")]
442 pub prompt_template: Option<PathBuf>,
443
444 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
448 pub llm_parallelism: u32,
449
450 #[arg(long)]
453 pub json: bool,
454
455 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
457 pub db: Option<String>,
458
459 #[arg(long, value_name = "SECONDS")]
462 pub wait_job_singleton: Option<u64>,
463
464 #[arg(long, default_value_t = false)]
468 pub force_job_singleton: bool,
469
470 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
474 pub names: Vec<String>,
475
476 #[arg(long, value_name = "PATH")]
480 pub names_file: Option<PathBuf>,
481
482 #[arg(long, default_value_t = false)]
486 pub preflight_check: bool,
487
488 #[arg(long, value_enum)]
492 pub fallback_mode: Option<EnrichMode>,
493
494 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
497 pub rate_limit_buffer: u64,
498
499 #[arg(long, default_value_t = true)]
503 pub max_load_check: bool,
504
505 #[arg(long, value_name = "N", default_value_t = 5)]
508 pub circuit_breaker_threshold: u32,
509
510 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
517 pub preserve_threshold: f64,
518
519 #[arg(long, default_value_t = true)]
524 pub codex_model_validate: bool,
525
526 #[arg(long, value_name = "MODEL")]
531 pub codex_model_fallback: Option<String>,
532}
533
534#[derive(Debug, Serialize)]
543struct PhaseEvent<'a> {
544 phase: &'a str,
545 #[serde(skip_serializing_if = "Option::is_none")]
546 binary_path: Option<&'a str>,
547 #[serde(skip_serializing_if = "Option::is_none")]
548 version: Option<&'a str>,
549 #[serde(skip_serializing_if = "Option::is_none")]
550 items_total: Option<usize>,
551 #[serde(skip_serializing_if = "Option::is_none")]
552 items_pending: Option<usize>,
553 #[serde(skip_serializing_if = "Option::is_none")]
555 llm_parallelism: Option<u32>,
556}
557
558#[derive(Debug, Serialize)]
559struct ItemEvent<'a> {
560 item: &'a str,
562 status: &'a str,
563 #[serde(skip_serializing_if = "Option::is_none")]
564 memory_id: Option<i64>,
565 #[serde(skip_serializing_if = "Option::is_none")]
566 entity_id: Option<i64>,
567 #[serde(skip_serializing_if = "Option::is_none")]
568 entities: Option<usize>,
569 #[serde(skip_serializing_if = "Option::is_none")]
570 rels: Option<usize>,
571 #[serde(skip_serializing_if = "Option::is_none")]
572 chars_before: Option<usize>,
573 #[serde(skip_serializing_if = "Option::is_none")]
574 chars_after: Option<usize>,
575 #[serde(skip_serializing_if = "Option::is_none")]
576 cost_usd: Option<f64>,
577 #[serde(skip_serializing_if = "Option::is_none")]
578 elapsed_ms: Option<u64>,
579 #[serde(skip_serializing_if = "Option::is_none")]
580 error: Option<String>,
581 index: usize,
582 total: usize,
583}
584
585#[derive(Debug, Serialize)]
586struct EnrichSummary {
587 summary: bool,
588 operation: String,
589 items_total: usize,
590 completed: usize,
591 failed: usize,
592 skipped: usize,
593 cost_usd: f64,
594 elapsed_ms: u64,
595 #[serde(skip_serializing_if = "Option::is_none")]
600 backend_invoked: Option<&'static str>,
601}
602
603use crate::output::emit_json_line as emit_json;
604
605fn open_queue_db(path: &str) -> Result<Connection, AppError> {
620 let conn = Connection::open(path)?;
621 conn.pragma_update(None, "journal_mode", "wal")?;
622 conn.execute_batch(
623 "CREATE TABLE IF NOT EXISTS queue (
624 id INTEGER PRIMARY KEY AUTOINCREMENT,
625 item_key TEXT NOT NULL UNIQUE,
626 item_type TEXT NOT NULL DEFAULT 'memory',
627 status TEXT NOT NULL DEFAULT 'pending',
628 memory_id INTEGER,
629 entity_id INTEGER,
630 entities INTEGER DEFAULT 0,
631 rels INTEGER DEFAULT 0,
632 error TEXT,
633 cost_usd REAL DEFAULT 0.0,
634 attempt INTEGER DEFAULT 0,
635 elapsed_ms INTEGER,
636 created_at TEXT DEFAULT (datetime('now')),
637 done_at TEXT
638 );
639 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
640 )?;
641 Ok(conn)
642}
643
644fn call_claude(
652 binary: &Path,
653 prompt: &str,
654 json_schema: &str,
655 input_text: &str,
656 model: Option<&str>,
657 timeout_secs: u64,
658) -> Result<(serde_json::Value, f64, bool), AppError> {
659 let result = crate::commands::claude_runner::run_claude(
660 binary,
661 prompt,
662 json_schema,
663 input_text,
664 model,
665 timeout_secs,
666 7,
667 )?;
668 Ok((result.value, result.cost_usd, result.is_oauth))
669}
670
671enum PreflightOutcome {
677 Healthy,
679 RateLimited {
683 reason: String,
684 suggestion: &'static str,
685 },
686 Error(AppError),
688}
689
690fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
698 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
699
700 match args.mode {
701 EnrichMode::ClaudeCode => {
702 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
703 Ok(b) => b,
704 Err(e) => return PreflightOutcome::Error(e),
705 };
706 let mut cmd = std::process::Command::new(&bin);
707 cmd.env_clear();
708 for var in &["PATH", "HOME", "USER"] {
709 if let Ok(val) = std::env::var(var) {
710 cmd.env(var, val);
711 }
712 }
713 cmd.arg("-p")
714 .arg("ping")
715 .arg("--max-turns")
716 .arg("1")
717 .arg("--strict-mcp-config")
718 .arg("--mcp-config")
719 .arg("{}")
720 .arg("--dangerously-skip-permissions")
721 .arg("--settings")
722 .arg("{\"hooks\":{}}")
723 .arg("--output-format")
724 .arg("json")
725 .stdin(std::process::Stdio::null())
726 .stdout(std::process::Stdio::piped())
727 .stderr(std::process::Stdio::piped());
728
729 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
730 Ok(c) => c,
731 Err(e) => {
732 return PreflightOutcome::Error(AppError::Io(e));
733 }
734 };
735 let output = match wait_with_timeout(child, timeout) {
736 Ok(out) => out,
737 Err(e) => return PreflightOutcome::Error(e),
738 };
739 if !output.status.success() {
740 let stderr = String::from_utf8_lossy(&output.stderr);
741 if stderr.contains("hit your session limit")
742 || stderr.contains("rate_limit")
743 || stderr.contains("429")
744 {
745 return PreflightOutcome::RateLimited {
746 reason: stderr.trim().to_string(),
747 suggestion:
748 "wait for the OAuth window to reset or use --fallback-mode codex",
749 };
750 }
751 return PreflightOutcome::Error(AppError::Validation(format!(
752 "preflight probe failed: {stderr}",
753 stderr = stderr.trim()
754 )));
755 }
756 PreflightOutcome::Healthy
757 }
758 EnrichMode::Codex => {
759 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
760 Ok(b) => b,
761 Err(e) => return PreflightOutcome::Error(e),
762 };
763 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
764 .map_err(PreflightOutcome::Error)
765 .ok();
766 let schema = "{}";
767 let schema_path = match super::codex_spawn::trusted_schema_path() {
768 Ok(p) => p,
769 Err(e) => return PreflightOutcome::Error(e),
770 };
771 let spawn_args = super::codex_spawn::CodexSpawnArgs {
772 binary: &bin,
773 prompt: "ping",
774 json_schema: schema,
775 input_text: "",
776 model: args.codex_model.as_deref(),
777 timeout_secs: args.rate_limit_buffer.max(60),
778 schema_path: schema_path.clone(),
779 };
780 let mut cmd = super::codex_spawn::build_codex_command(&spawn_args);
781 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
782 Ok(c) => c,
783 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
784 };
785 let output = match wait_with_timeout(child, timeout) {
786 Ok(out) => out,
787 Err(e) => return PreflightOutcome::Error(e),
788 };
789 let _ = std::fs::remove_file(&schema_path);
790 if !output.status.success() {
791 let stderr = String::from_utf8_lossy(&output.stderr);
792 if stderr.contains("rate_limit")
793 || stderr.contains("429")
794 || stderr.contains("Too Many Requests")
795 {
796 return PreflightOutcome::RateLimited {
797 reason: stderr.trim().to_string(),
798 suggestion: "wait for the rate-limit window to reset",
799 };
800 }
801 return PreflightOutcome::Error(AppError::Validation(format!(
802 "preflight probe failed: {stderr}",
803 stderr = stderr.trim()
804 )));
805 }
806 PreflightOutcome::Healthy
807 }
808 }
809}
810
811fn wait_with_timeout(
813 mut child: std::process::Child,
814 timeout: std::time::Duration,
815) -> Result<std::process::Output, AppError> {
816 use wait_timeout::ChildExt;
817 let start = std::time::Instant::now();
818 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
819 if status.is_none() {
820 let _ = child.kill();
821 let _ = child.wait();
822 return Err(AppError::Validation(format!(
823 "preflight probe timed out after {}s",
824 start.elapsed().as_secs()
825 )));
826 }
827 let mut stdout = Vec::new();
828 if let Some(mut out) = child.stdout.take() {
829 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
830 }
831 let mut stderr = Vec::new();
832 if let Some(mut err) = child.stderr.take() {
833 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
834 }
835 let exit = status.unwrap();
836 Ok(std::process::Output {
837 status: exit,
838 stdout,
839 stderr,
840 })
841}
842
843fn scan_unbound_memories(
854 conn: &Connection,
855 namespace: &str,
856 limit: Option<usize>,
857 name_filter: &[String],
858) -> Result<Vec<(i64, String, String)>, AppError> {
859 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
860
861 if name_filter.is_empty() {
862 let sql = format!(
863 "SELECT m.id, m.name, m.body
864 FROM memories m
865 WHERE m.namespace = ?1
866 AND m.deleted_at IS NULL
867 AND NOT EXISTS (
868 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
869 )
870 ORDER BY m.id
871 {limit_clause}"
872 );
873 let mut stmt = conn.prepare(&sql)?;
874 let rows = stmt
875 .query_map(rusqlite::params![namespace], |r| {
876 Ok((
877 r.get::<_, i64>(0)?,
878 r.get::<_, String>(1)?,
879 r.get::<_, String>(2)?,
880 ))
881 })?
882 .collect::<Result<Vec<_>, _>>()?;
883 Ok(rows)
884 } else {
885 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
887 .map(|i| format!("?{i}"))
888 .collect();
889 let in_clause = placeholders.join(", ");
890 let sql = format!(
891 "SELECT m.id, m.name, m.body
892 FROM memories m
893 WHERE m.namespace = ?1
894 AND m.deleted_at IS NULL
895 AND m.name IN ({in_clause})
896 AND NOT EXISTS (
897 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
898 )
899 ORDER BY m.id
900 {limit_clause}"
901 );
902 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
903 params_vec.push(&namespace);
904 for n in name_filter {
905 params_vec.push(n);
906 }
907 let mut stmt = conn.prepare(&sql)?;
908 let rows = stmt
909 .query_map(
910 rusqlite::params_from_iter(params_vec.iter().copied()),
911 |r| {
912 Ok((
913 r.get::<_, i64>(0)?,
914 r.get::<_, String>(1)?,
915 r.get::<_, String>(2)?,
916 ))
917 },
918 )?
919 .collect::<Result<Vec<_>, _>>()?;
920 Ok(rows)
921 }
922}
923
924fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
929 let content = std::fs::read_to_string(path).map_err(|e| {
930 AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
931 })?;
932 let mut seen = std::collections::HashSet::new();
933 let mut out = Vec::new();
934 for line in content.lines() {
935 let trimmed = line.trim();
936 if trimmed.is_empty() || trimmed.starts_with('#') {
937 continue;
938 }
939 if seen.insert(trimmed.to_string()) {
940 out.push(trimmed.to_string());
941 }
942 }
943 Ok(out)
944}
945
946fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
948 let mut combined: Vec<String> = args.names.clone();
949 if let Some(p) = &args.names_file {
950 let from_file = read_names_file(p)?;
951 for n in from_file {
952 if !combined.contains(&n) {
953 combined.push(n);
954 }
955 }
956 }
957 Ok(combined)
958}
959
960fn scan_entities_without_description(
964 conn: &Connection,
965 namespace: &str,
966 limit: Option<usize>,
967) -> Result<Vec<(i64, String, String)>, AppError> {
968 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
969 let sql = format!(
970 "SELECT id, name, type
971 FROM entities
972 WHERE namespace = ?1
973 AND (description IS NULL OR description = '')
974 ORDER BY id
975 {limit_clause}"
976 );
977 let mut stmt = conn.prepare(&sql)?;
978 let rows = stmt
979 .query_map(rusqlite::params![namespace], |r| {
980 Ok((
981 r.get::<_, i64>(0)?,
982 r.get::<_, String>(1)?,
983 r.get::<_, String>(2)?,
984 ))
985 })?
986 .collect::<Result<Vec<_>, _>>()?;
987 Ok(rows)
988}
989
990fn scan_short_body_memories(
994 conn: &Connection,
995 namespace: &str,
996 min_chars: usize,
997 limit: Option<usize>,
998) -> Result<Vec<(i64, String, String)>, AppError> {
999 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1000 let sql = format!(
1001 "SELECT m.id, m.name, m.body
1002 FROM memories m
1003 WHERE m.namespace = ?1
1004 AND m.deleted_at IS NULL
1005 AND LENGTH(COALESCE(m.body,'')) < ?2
1006 ORDER BY m.id
1007 {limit_clause}"
1008 );
1009 let mut stmt = conn.prepare(&sql)?;
1010 let rows = stmt
1011 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1012 Ok((
1013 r.get::<_, i64>(0)?,
1014 r.get::<_, String>(1)?,
1015 r.get::<_, String>(2)?,
1016 ))
1017 })?
1018 .collect::<Result<Vec<_>, _>>()?;
1019 Ok(rows)
1020}
1021
1022fn scan_memories_without_embeddings(
1026 conn: &Connection,
1027 namespace: &str,
1028 limit: Option<usize>,
1029 name_filter: &[String],
1030) -> Result<Vec<(i64, String, String)>, AppError> {
1031 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1032
1033 if name_filter.is_empty() {
1034 let sql = format!(
1035 "SELECT m.id, m.name, COALESCE(m.body,'')
1036 FROM memories m
1037 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1038 WHERE m.namespace = ?1
1039 AND m.deleted_at IS NULL
1040 AND me.memory_id IS NULL
1041 ORDER BY m.id
1042 {limit_clause}"
1043 );
1044 let mut stmt = conn.prepare(&sql)?;
1045 let rows = stmt
1046 .query_map(rusqlite::params![namespace], |r| {
1047 Ok((
1048 r.get::<_, i64>(0)?,
1049 r.get::<_, String>(1)?,
1050 r.get::<_, String>(2)?,
1051 ))
1052 })?
1053 .collect::<Result<Vec<_>, _>>()?;
1054 Ok(rows)
1055 } else {
1056 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1057 .map(|i| format!("?{i}"))
1058 .collect();
1059 let in_clause = placeholders.join(", ");
1060 let sql = format!(
1061 "SELECT m.id, m.name, COALESCE(m.body,'')
1062 FROM memories m
1063 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1064 WHERE m.namespace = ?1
1065 AND m.deleted_at IS NULL
1066 AND m.name IN ({in_clause})
1067 AND me.memory_id IS NULL
1068 ORDER BY m.id
1069 {limit_clause}"
1070 );
1071 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1072 params_vec.push(&namespace);
1073 for n in name_filter {
1074 params_vec.push(n);
1075 }
1076 let mut stmt = conn.prepare(&sql)?;
1077 let rows = stmt
1078 .query_map(
1079 rusqlite::params_from_iter(params_vec.iter().copied()),
1080 |r| {
1081 Ok((
1082 r.get::<_, i64>(0)?,
1083 r.get::<_, String>(1)?,
1084 r.get::<_, String>(2)?,
1085 ))
1086 },
1087 )?
1088 .collect::<Result<Vec<_>, _>>()?;
1089 Ok(rows)
1090 }
1091}
1092
1093#[allow(clippy::type_complexity)]
1095fn scan_weight_candidates(
1096 conn: &Connection,
1097 namespace: &str,
1098 limit: Option<usize>,
1099) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1100 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1101 let sql = format!(
1102 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1103 FROM relationships r \
1104 JOIN entities e1 ON e1.id = r.source_id \
1105 JOIN entities e2 ON e2.id = r.target_id \
1106 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1107 ORDER BY r.weight DESC {limit_clause}"
1108 );
1109 let mut stmt = conn.prepare(&sql)?;
1110 let rows = stmt
1111 .query_map(rusqlite::params![namespace], |r| {
1112 Ok((
1113 r.get::<_, i64>(0)?,
1114 r.get::<_, String>(1)?,
1115 r.get::<_, String>(2)?,
1116 r.get::<_, String>(3)?,
1117 r.get::<_, f64>(4)?,
1118 ))
1119 })?
1120 .collect::<Result<Vec<_>, _>>()?;
1121 Ok(rows)
1122}
1123
1124fn scan_generic_relations(
1126 conn: &Connection,
1127 namespace: &str,
1128 limit: Option<usize>,
1129) -> Result<Vec<(i64, String, String, String)>, AppError> {
1130 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1131 let sql = format!(
1132 "SELECT r.id, e1.name, e2.name, r.relation \
1133 FROM relationships r \
1134 JOIN entities e1 ON e1.id = r.source_id \
1135 JOIN entities e2 ON e2.id = r.target_id \
1136 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1137 ORDER BY r.id {limit_clause}"
1138 );
1139 let mut stmt = conn.prepare(&sql)?;
1140 let rows = stmt
1141 .query_map(rusqlite::params![namespace], |r| {
1142 Ok((
1143 r.get::<_, i64>(0)?,
1144 r.get::<_, String>(1)?,
1145 r.get::<_, String>(2)?,
1146 r.get::<_, String>(3)?,
1147 ))
1148 })?
1149 .collect::<Result<Vec<_>, _>>()?;
1150 Ok(rows)
1151}
1152
1153fn persist_memory_bindings(
1162 conn: &Connection,
1163 namespace: &str,
1164 memory_id: i64,
1165 entities_json: &serde_json::Value,
1166 rels_json: &serde_json::Value,
1167) -> Result<(usize, usize), AppError> {
1168 #[derive(Deserialize)]
1169 struct EntityItem {
1170 name: String,
1171 entity_type: String,
1172 }
1173 #[derive(Deserialize)]
1174 struct RelItem {
1175 source: String,
1176 target: String,
1177 relation: String,
1178 strength: f64,
1179 }
1180
1181 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1182 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1183
1184 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1185 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1186
1187 let mut ent_count = 0usize;
1188 let mut rel_count = 0usize;
1189
1190 for item in &extracted_entities {
1191 let entity_type = match item.entity_type.parse::<EntityType>() {
1192 Ok(et) => et,
1193 Err(_) => {
1194 tracing::warn!(
1195 target: "enrich",
1196 entity = %item.name,
1197 entity_type = %item.entity_type,
1198 "entity type not recognized, skipping"
1199 );
1200 continue;
1201 }
1202 };
1203 match entities::upsert_entity(
1204 conn,
1205 namespace,
1206 &NewEntity {
1207 name: item.name.clone(),
1208 entity_type,
1209 description: None,
1210 },
1211 ) {
1212 Ok(eid) => {
1213 let _ = entities::link_memory_entity(conn, memory_id, eid);
1214 ent_count += 1;
1215 }
1216 Err(e) => {
1217 tracing::warn!(
1218 target: "enrich",
1219 entity = %item.name,
1220 error = %e,
1221 "entity upsert skipped"
1222 );
1223 }
1224 }
1225 }
1226
1227 for rel in &extracted_rels {
1228 let normalized = crate::parsers::normalize_relation(&rel.relation);
1229 crate::parsers::warn_if_non_canonical(&normalized);
1230
1231 let src_name = crate::parsers::normalize_entity_name(&rel.source);
1234 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1235 let src_id = entities::find_entity_id(conn, namespace, &src_name);
1236 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1237 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1238 let new_rel = NewRelationship {
1239 source: rel.source.clone(),
1240 target: rel.target.clone(),
1241 relation: normalized,
1242 strength: rel.strength,
1243 description: None,
1244 };
1245 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1246 rel_count += 1;
1247 }
1248 }
1249 }
1250
1251 Ok((ent_count, rel_count))
1252}
1253
1254fn persist_entity_description(
1256 conn: &Connection,
1257 entity_id: i64,
1258 description: &str,
1259) -> Result<(), AppError> {
1260 conn.execute(
1261 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1262 rusqlite::params![description, entity_id],
1263 )?;
1264 Ok(())
1265}
1266
1267#[allow(clippy::too_many_arguments)]
1273fn reembed_memory_vector(
1274 conn: &Connection,
1275 namespace: &str,
1276 memory_id: i64,
1277 memory_name: &str,
1278 memory_type: &str,
1279 body: &str,
1280 paths: &crate::paths::AppPaths,
1281 llm_backend: crate::cli::LlmBackendChoice,
1282) -> Result<(), AppError> {
1283 let snippet: String = body.chars().take(200).collect();
1284 let (embedding, backend_kind) =
1289 crate::embedder::embed_passage_with_choice(&paths.models, body, Some(llm_backend))?;
1290 record_enrich_backend(backend_kind.as_str());
1291 memories::upsert_vec(
1292 conn,
1293 memory_id,
1294 namespace,
1295 memory_type,
1296 &embedding,
1297 memory_name,
1298 &snippet,
1299 )?;
1300 Ok(())
1301}
1302
1303fn record_enrich_backend(backend: &'static str) {
1309 if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1310 *guard = Some(backend);
1311 }
1312}
1313
1314fn take_enrich_backend() -> Option<&'static str> {
1315 ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1316}
1317
1318static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1319
1320fn persist_enriched_body(
1325 conn: &Connection,
1326 namespace: &str,
1327 memory_id: i64,
1328 memory_name: &str,
1329 new_body: &str,
1330 paths: &crate::paths::AppPaths,
1331 llm_backend: crate::cli::LlmBackendChoice,
1332) -> Result<(), AppError> {
1333 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1335 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1336 rusqlite::params![memory_id],
1337 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1338 )?;
1339
1340 let memory_type: String = conn.query_row(
1341 "SELECT type FROM memories WHERE id=?1",
1342 rusqlite::params![memory_id],
1343 |r| r.get(0),
1344 )?;
1345
1346 let description: String = conn.query_row(
1347 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1348 rusqlite::params![memory_id],
1349 |r| r.get(0),
1350 )?;
1351
1352 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1353
1354 let new_memory = memories::NewMemory {
1355 namespace: namespace.to_string(),
1356 name: memory_name.to_string(),
1357 memory_type: memory_type.clone(),
1358 description: description.clone(),
1359 body: new_body.to_string(),
1360 body_hash,
1361 session_id: None,
1362 source: "agent".to_string(),
1363 metadata: serde_json::json!({
1364 "operation": "body-enrich",
1365 "orig_chars": old_body.chars().count(),
1366 "new_chars": new_body.chars().count(),
1367 }),
1368 };
1369
1370 let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1374 let version_metadata = serde_json::json!({
1375 "operation": "body-enrich",
1376 "orig_chars": old_body.chars().count(),
1377 "new_chars": new_body.chars().count(),
1378 })
1379 .to_string();
1380 crate::storage::versions::insert_version(
1381 conn,
1382 memory_id,
1383 next_version,
1384 memory_name,
1385 &memory_type,
1386 &description,
1387 new_body,
1388 &version_metadata,
1389 Some("enrich"),
1390 "edit",
1391 )?;
1392
1393 memories::update(conn, memory_id, &new_memory, None)?;
1394 memories::sync_fts_after_update(
1395 conn,
1396 memory_id,
1397 &old_name,
1398 &old_desc,
1399 &old_body,
1400 &new_memory.name,
1401 &new_memory.description,
1402 &new_memory.body,
1403 )?;
1404
1405 if let Err(e) = reembed_memory_vector(
1407 conn,
1408 namespace,
1409 memory_id,
1410 memory_name,
1411 &memory_type,
1412 new_body,
1413 paths,
1414 llm_backend,
1415 ) {
1416 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1417 }
1418
1419 Ok(())
1420}
1421
1422fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1434 value == default
1435}
1436
1437fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1452 const DEFAULT_TIMEOUT: u64 = 300;
1453
1454 let mut conflicts: Vec<String> = Vec::new();
1455
1456 match args.mode {
1457 EnrichMode::ClaudeCode => {
1458 if args.codex_binary.is_some() {
1459 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1460 }
1461 if args.codex_model.is_some() {
1462 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1463 }
1464 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1465 conflicts.push(format!(
1466 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1467 args.codex_timeout
1468 ));
1469 }
1470 }
1471 EnrichMode::Codex => {
1472 if args.claude_binary.is_some() {
1473 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1474 }
1475 if args.claude_model.is_some() {
1476 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1477 }
1478 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1479 conflicts.push(format!(
1480 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1481 args.claude_timeout
1482 ));
1483 }
1484 if args.max_cost_usd.is_some() {
1485 conflicts.push(
1486 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1487 .to_string(),
1488 );
1489 }
1490 }
1491 }
1492
1493 if !conflicts.is_empty() {
1494 return Err(AppError::Validation(format!(
1495 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1496 args.mode,
1497 conflicts.join("\n - ")
1498 )));
1499 }
1500
1501 Ok(())
1502}
1503
1504pub fn run(args: &EnrichArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1508 validate_mode_conditional_flags_enrich(args)?;
1511 let started = Instant::now();
1512
1513 let paths = AppPaths::resolve(args.db.as_deref())?;
1514 ensure_db_ready(&paths)?;
1515 let conn = open_rw(&paths.db)?;
1516 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1517
1518 let wait_secs = args.wait_job_singleton;
1524 let force_flag = args.force_job_singleton;
1525 let _singleton = crate::lock::acquire_job_singleton(
1526 crate::lock::JobType::Enrich,
1527 &namespace,
1528 &paths.db,
1529 wait_secs,
1530 force_flag,
1531 )?;
1532
1533 let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1535 None
1536 } else {
1537 Some(match args.mode {
1538 EnrichMode::ClaudeCode => {
1539 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1540 let version = super::claude_runner::validate_claude_version(&bin)?;
1541 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1542 emit_json(&PhaseEvent {
1543 phase: "validate",
1544 binary_path: bin.to_str(),
1545 version: Some(&version),
1546 items_total: None,
1547 items_pending: None,
1548 llm_parallelism: None,
1549 });
1550 bin
1551 }
1552 EnrichMode::Codex => {
1553 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1554 emit_json(&PhaseEvent {
1555 phase: "validate",
1556 binary_path: bin.to_str(),
1557 version: None,
1558 items_total: None,
1559 items_pending: None,
1560 llm_parallelism: None,
1561 });
1562 bin
1563 }
1564 })
1565 };
1566
1567 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1571 let load = crate::system_load::load_average_one();
1572 let n = crate::system_load::ncpus();
1573 return Err(AppError::Validation(format!(
1574 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1575 pass --no-max-load-check to override (not recommended)"
1576 )));
1577 }
1578
1579 if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1586 {
1587 let preflight_result = run_preflight_probe(args);
1588 match preflight_result {
1589 PreflightOutcome::Healthy => {
1590 tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1591 }
1592 PreflightOutcome::RateLimited { reason, suggestion } => {
1593 if let Some(fallback) = args.fallback_mode.clone() {
1594 if fallback != args.mode {
1595 return Err(AppError::Validation(format!(
1605 "preflight detected rate limit on {mode:?}: {reason}; \
1606 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1607 mode = args.mode
1608 )));
1609 }
1610 return Err(AppError::Validation(format!(
1611 "preflight detected rate limit on {mode:?}: {reason}; \
1612 --fallback-mode matches --mode, no recovery possible",
1613 mode = args.mode
1614 )));
1615 }
1616 return Err(AppError::Validation(format!(
1617 "preflight detected rate limit on {mode:?}: {reason}; \
1618 {suggestion}; pass --fallback-mode codex to recover",
1619 mode = args.mode
1620 )));
1621 }
1622 PreflightOutcome::Error(e) => {
1623 return Err(e);
1624 }
1625 }
1626 }
1627
1628 let scan_result = scan_operation(&conn, &namespace, args)?;
1630 let total = scan_result.len();
1631
1632 emit_json(&PhaseEvent {
1633 phase: "scan",
1634 binary_path: None,
1635 version: None,
1636 items_total: Some(total),
1637 items_pending: Some(total),
1638 llm_parallelism: Some(args.llm_parallelism),
1639 });
1640
1641 if args.dry_run {
1643 for (idx, key) in scan_result.iter().enumerate() {
1644 emit_json(&ItemEvent {
1645 item: key,
1646 status: "preview",
1647 memory_id: None,
1648 entity_id: None,
1649 entities: None,
1650 rels: None,
1651 chars_before: None,
1652 chars_after: None,
1653 cost_usd: None,
1654 elapsed_ms: None,
1655 error: None,
1656 index: idx,
1657 total,
1658 });
1659 }
1660 emit_json(&EnrichSummary {
1661 summary: true,
1662 operation: format!("{:?}", args.operation),
1663 items_total: total,
1664 completed: 0,
1665 failed: 0,
1666 skipped: 0,
1667 cost_usd: 0.0,
1668 elapsed_ms: started.elapsed().as_millis() as u64,
1669 backend_invoked: take_enrich_backend(),
1670 });
1671 return Ok(());
1672 }
1673
1674 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1678
1679 if args.resume {
1680 let reset = queue_conn
1681 .execute(
1682 "UPDATE queue SET status='pending' WHERE status='processing'",
1683 [],
1684 )
1685 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1686 if reset > 0 {
1687 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1688 }
1689 }
1690
1691 if args.retry_failed {
1692 let count = queue_conn
1693 .execute(
1694 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1695 [],
1696 )
1697 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1698 tracing::info!(target: "enrich", count, "retrying failed items");
1699 }
1700
1701 if !args.resume && !args.retry_failed {
1702 queue_conn
1703 .execute("DELETE FROM queue", [])
1704 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1705 }
1706
1707 for (idx, key) in scan_result.iter().enumerate() {
1709 let item_type = match args.operation {
1710 EnrichOperation::EntityDescriptions => "entity",
1711 _ => "memory",
1712 };
1713 if let Err(e) = queue_conn.execute(
1714 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1715 rusqlite::params![key, item_type],
1716 ) {
1717 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1718 }
1719 let _ = idx; }
1721
1722 let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1725 if parallelism > 1 {
1726 tracing::info!(
1727 target: "enrich",
1728 llm_parallelism = parallelism,
1729 "parallel LLM processing with bounded thread pool"
1730 );
1731 }
1732 if parallelism > 4 {
1736 match args.mode {
1737 EnrichMode::ClaudeCode => {
1738 tracing::warn!(
1739 target: "enrich",
1740 llm_parallelism = parallelism,
1741 recommended_max = 4,
1742 mode = "claude-code",
1743 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1744 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1745 to cut MCP children (G28-A)"
1746 );
1747 }
1748 EnrichMode::Codex if parallelism > 16 => {
1749 tracing::warn!(
1750 target: "enrich",
1751 llm_parallelism = parallelism,
1752 recommended_max = 16,
1753 mode = "codex",
1754 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1755 consider --llm-parallelism 8 for safer concurrency"
1756 );
1757 }
1758 EnrichMode::Codex => {
1759 }
1763 }
1764 }
1765
1766 let mut completed = 0usize;
1767 let mut failed = 0usize;
1768 let mut skipped = 0usize;
1769 let mut cost_total = 0.0f64;
1770 let mut oauth_detected = false;
1771 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1772 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1773 let enrich_started = std::time::Instant::now();
1774
1775 let provider_timeout = match args.mode {
1776 EnrichMode::ClaudeCode => args.claude_timeout,
1777 EnrichMode::Codex => args.codex_timeout,
1778 };
1779
1780 let provider_model: Option<&str> = match args.mode {
1781 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1782 EnrichMode::Codex => args.codex_model.as_deref(),
1783 };
1784
1785 if parallelism > 1 {
1789 let stdout_mu = parking_lot::Mutex::new(());
1790 let budget = args.max_cost_usd;
1791 let operation = args.operation.clone();
1792 let mode = args.mode.clone();
1793 let min_oc = args.min_output_chars;
1794 let max_oc = args.max_output_chars;
1795 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1796
1797 struct WorkerResult {
1798 completed: usize,
1799 failed: usize,
1800 skipped: usize,
1801 cost: f64,
1802 oauth: bool,
1803 }
1804
1805 let results: Vec<WorkerResult> = std::thread::scope(|s| {
1806 let handles: Vec<_> = (0..parallelism)
1807 .map(|worker_id| {
1808 let stdout_mu = &stdout_mu;
1809 let paths = &paths;
1810 let namespace = &namespace;
1811 let provider_binary = provider_binary.as_deref();
1812 let operation = &operation;
1813 let mode = &mode;
1814 let prompt_tpl = prompt_tpl.as_deref();
1815 s.spawn(move || {
1816 let w_conn = match open_rw(&paths.db) {
1817 Ok(c) => c,
1818 Err(e) => {
1819 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1820 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1821 }
1822 };
1823 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1824 Ok(c) => c,
1825 Err(e) => {
1826 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1827 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1828 }
1829 };
1830 let mut w_completed = 0usize;
1831 let mut w_failed = 0usize;
1832 let mut w_skipped = 0usize;
1833 let mut w_cost = 0.0f64;
1834 let mut w_oauth = false;
1835 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1836 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1837 let mut w_breaker = crate::retry::CircuitBreaker::new(
1843 args.circuit_breaker_threshold.max(1),
1844 std::time::Duration::from_secs(60),
1845 );
1846
1847 loop {
1848 if crate::shutdown_requested() {
1849 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1850 break;
1851 }
1852 if let Some(b) = budget {
1853 if !w_oauth && w_cost >= b {
1854 break;
1855 }
1856 }
1857 let pending: Option<(i64, String, String)> = w_queue
1858 .query_row(
1859 "UPDATE queue SET status='processing', attempt=attempt+1 \
1860 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1861 RETURNING id, item_key, item_type",
1862 [],
1863 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1864 )
1865 .ok();
1866 let (queue_id, item_key, _item_type) = match pending {
1867 Some(p) => p,
1868 None => break,
1869 };
1870 let item_started = Instant::now();
1871 let current_index = w_completed + w_failed + w_skipped;
1872
1873 let call_result = match operation {
1874 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1875 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1876 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend),
1877 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend),
1878 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1879 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1880 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1881 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1882 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1883 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1884 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1885 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1886 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
1887 };
1888
1889 match call_result {
1890 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1891 if is_oauth { w_oauth = true; }
1892 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1893 let _ = w_queue.execute(
1894 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1895 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1896 );
1897 w_completed += 1;
1898 if !is_oauth { w_cost += cost; }
1899 let _ = w_breaker
1901 .record(crate::retry::AttemptOutcome::Success);
1902 let _guard = stdout_mu.lock();
1903 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1904 }
1905 Ok(EnrichItemResult::Skipped { reason }) => {
1906 w_skipped += 1;
1907 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1908 let _guard = stdout_mu.lock();
1909 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1910 }
1911 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
1912 w_skipped += 1;
1918 let reason = format!(
1919 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
1920 );
1921 let _ = w_queue.execute(
1922 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1923 rusqlite::params![reason, queue_id],
1924 );
1925 let _guard = stdout_mu.lock();
1926 emit_json(&ItemEvent {
1927 item: &item_key,
1928 status: "preservation_failed",
1929 memory_id: None,
1930 entity_id: None,
1931 entities: None,
1932 rels: None,
1933 chars_before: Some(chars_before),
1934 chars_after: Some(chars_after),
1935 cost_usd: None,
1936 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1937 error: Some(reason),
1938 index: current_index,
1939 total,
1940 });
1941 }
1942 Err(e) => {
1943 let err_str = format!("{e}");
1944 if matches!(e, AppError::RateLimited { .. }) {
1945 if crate::retry::is_kill_switch_active() {
1946 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1947 } else if std::time::Instant::now() >= w_deadline {
1948 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1949 } else {
1950 let half = w_backoff / 2;
1951 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1952 let actual_wait = half + jitter;
1953 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1954 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1955 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1956 w_backoff = (w_backoff * 2).min(900);
1957 continue;
1958 }
1959 }
1960 w_failed += 1;
1961 let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1962 let _guard = stdout_mu.lock();
1963 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1964 let breaker_opened = w_breaker
1966 .record(crate::retry::AttemptOutcome::HardFailure);
1967 if breaker_opened {
1968 tracing::error!(target: "enrich",
1969 consecutive_failures = w_breaker.consecutive_failures(),
1970 "circuit breaker opened — aborting worker"
1971 );
1972 break;
1973 }
1974 }
1975 }
1976 }
1977 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1978 })
1979 })
1980 .collect();
1981 handles
1982 .into_iter()
1983 .map(|h| {
1984 h.join().unwrap_or(WorkerResult {
1985 completed: 0,
1986 failed: 0,
1987 skipped: 0,
1988 cost: 0.0,
1989 oauth: false,
1990 })
1991 })
1992 .collect()
1993 });
1994
1995 for r in &results {
1996 completed += r.completed;
1997 failed += r.failed;
1998 skipped += r.skipped;
1999 cost_total += r.cost;
2000 if r.oauth && !oauth_detected {
2001 oauth_detected = true;
2002 }
2003 }
2004 } else {
2005 loop {
2007 if crate::shutdown_requested() {
2008 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2009 break;
2010 }
2011
2012 if let Some(budget) = args.max_cost_usd {
2014 if !oauth_detected && cost_total >= budget {
2015 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2016 break;
2017 }
2018 }
2019
2020 let pending: Option<(i64, String, String)> = queue_conn
2022 .query_row(
2023 "UPDATE queue SET status='processing', attempt=attempt+1 \
2024 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2025 RETURNING id, item_key, item_type",
2026 [],
2027 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2028 )
2029 .ok();
2030
2031 let (queue_id, item_key, item_type) = match pending {
2032 Some(p) => p,
2033 None => break,
2034 };
2035
2036 let item_started = Instant::now();
2037 let current_index = completed + failed + skipped;
2038
2039 let call_result = match args.operation {
2040 EnrichOperation::MemoryBindings => call_memory_bindings(
2041 &conn,
2042 &namespace,
2043 &item_key,
2044 provider_binary
2045 .as_deref()
2046 .expect("provider binary required"),
2047 provider_model,
2048 provider_timeout,
2049 &args.mode,
2050 ),
2051 EnrichOperation::EntityDescriptions => call_entity_description(
2052 &conn,
2053 &namespace,
2054 &item_key,
2055 provider_binary
2056 .as_deref()
2057 .expect("provider binary required"),
2058 provider_model,
2059 provider_timeout,
2060 &args.mode,
2061 ),
2062 EnrichOperation::BodyEnrich => call_body_enrich(
2063 &conn,
2064 &namespace,
2065 &item_key,
2066 provider_binary
2067 .as_deref()
2068 .expect("provider binary required"),
2069 provider_model,
2070 provider_timeout,
2071 &args.mode,
2072 args.min_output_chars,
2073 args.max_output_chars,
2074 args.prompt_template.as_deref(),
2075 args.preserve_threshold,
2076 &paths,
2077 llm_backend,
2078 ),
2079 EnrichOperation::ReEmbed => {
2080 call_reembed(&conn, &namespace, &item_key, &paths, llm_backend)
2081 }
2082 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2083 &conn,
2084 &namespace,
2085 &item_key,
2086 provider_binary
2087 .as_deref()
2088 .expect("provider binary required"),
2089 provider_model,
2090 provider_timeout,
2091 &args.mode,
2092 ),
2093 EnrichOperation::RelationReclassify => call_relation_reclassify(
2094 &conn,
2095 &namespace,
2096 &item_key,
2097 provider_binary
2098 .as_deref()
2099 .expect("provider binary required"),
2100 provider_model,
2101 provider_timeout,
2102 &args.mode,
2103 ),
2104 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2105 call_entity_connect(
2106 &conn,
2107 &namespace,
2108 &item_key,
2109 provider_binary
2110 .as_deref()
2111 .expect("provider binary required"),
2112 provider_model,
2113 provider_timeout,
2114 &args.mode,
2115 )
2116 }
2117 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2118 &conn,
2119 &namespace,
2120 &item_key,
2121 provider_binary
2122 .as_deref()
2123 .expect("provider binary required"),
2124 provider_model,
2125 provider_timeout,
2126 &args.mode,
2127 ),
2128 EnrichOperation::DescriptionEnrich => call_description_enrich(
2129 &conn,
2130 &namespace,
2131 &item_key,
2132 provider_binary
2133 .as_deref()
2134 .expect("provider binary required"),
2135 provider_model,
2136 provider_timeout,
2137 &args.mode,
2138 ),
2139 EnrichOperation::DomainClassify => call_domain_classify(
2140 &conn,
2141 &namespace,
2142 &item_key,
2143 provider_binary
2144 .as_deref()
2145 .expect("provider binary required"),
2146 provider_model,
2147 provider_timeout,
2148 &args.mode,
2149 ),
2150 EnrichOperation::GraphAudit => call_graph_audit(
2151 &conn,
2152 &namespace,
2153 &item_key,
2154 provider_binary
2155 .as_deref()
2156 .expect("provider binary required"),
2157 provider_model,
2158 provider_timeout,
2159 &args.mode,
2160 ),
2161 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2162 &conn,
2163 &namespace,
2164 &item_key,
2165 provider_binary
2166 .as_deref()
2167 .expect("provider binary required"),
2168 provider_model,
2169 provider_timeout,
2170 &args.mode,
2171 ),
2172 EnrichOperation::BodyExtract => call_body_extract(
2173 &conn,
2174 &namespace,
2175 &item_key,
2176 provider_binary
2177 .as_deref()
2178 .expect("provider binary required"),
2179 provider_model,
2180 provider_timeout,
2181 &args.mode,
2182 ),
2183 };
2184
2185 match call_result {
2186 Ok(EnrichItemResult::Done {
2187 memory_id,
2188 entity_id,
2189 entities,
2190 rels,
2191 chars_before,
2192 chars_after,
2193 cost,
2194 is_oauth,
2195 }) => {
2196 if is_oauth && !oauth_detected {
2197 oauth_detected = true;
2198 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2199 }
2200 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2201
2202 let persist_err: Option<String> = match args.operation {
2204 EnrichOperation::MemoryBindings => {
2205 None
2207 }
2208 EnrichOperation::EntityDescriptions => {
2209 None
2211 }
2212 EnrichOperation::BodyEnrich => {
2213 None
2215 }
2216 _ => {
2217 None
2219 }
2220 };
2221
2222 if let Err(e) = queue_conn.execute(
2223 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2224 rusqlite::params![
2225 memory_id,
2226 entity_id,
2227 entities as i64,
2228 rels as i64,
2229 cost,
2230 item_started.elapsed().as_millis() as i64,
2231 queue_id
2232 ],
2233 ) {
2234 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2235 }
2236
2237 if persist_err.is_none() {
2238 completed += 1;
2239 if !is_oauth {
2240 cost_total += cost;
2241 }
2242 emit_json(&ItemEvent {
2243 item: &item_key,
2244 status: "done",
2245 memory_id,
2246 entity_id,
2247 entities: Some(entities),
2248 rels: Some(rels),
2249 chars_before,
2250 chars_after,
2251 cost_usd: if is_oauth { None } else { Some(cost) },
2252 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2253 error: None,
2254 index: current_index,
2255 total,
2256 });
2257 } else {
2258 failed += 1;
2259 emit_json(&ItemEvent {
2260 item: &item_key,
2261 status: "failed",
2262 memory_id: None,
2263 entity_id: None,
2264 entities: None,
2265 rels: None,
2266 chars_before: None,
2267 chars_after: None,
2268 cost_usd: None,
2269 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2270 error: persist_err,
2271 index: current_index,
2272 total,
2273 });
2274 }
2275 }
2276 Ok(EnrichItemResult::Skipped { reason }) => {
2277 skipped += 1;
2278 if let Err(e) = queue_conn.execute(
2279 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2280 rusqlite::params![reason, queue_id],
2281 ) {
2282 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2283 }
2284 emit_json(&ItemEvent {
2285 item: &item_key,
2286 status: "skipped",
2287 memory_id: None,
2288 entity_id: None,
2289 entities: None,
2290 rels: None,
2291 chars_before: None,
2292 chars_after: None,
2293 cost_usd: None,
2294 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2295 error: None,
2296 index: current_index,
2297 total,
2298 });
2299 }
2300 Ok(EnrichItemResult::PreservationFailed {
2301 score,
2302 threshold,
2303 chars_before,
2304 chars_after,
2305 }) => {
2306 skipped += 1;
2313 let reason = format!(
2314 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2315 );
2316 if let Err(qe) = queue_conn.execute(
2317 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2318 rusqlite::params![reason, queue_id],
2319 ) {
2320 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2321 }
2322 emit_json(&ItemEvent {
2323 item: &item_key,
2324 status: "preservation_failed",
2325 memory_id: None,
2326 entity_id: None,
2327 entities: None,
2328 rels: None,
2329 chars_before: Some(chars_before),
2330 chars_after: Some(chars_after),
2331 cost_usd: None,
2332 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2333 error: Some(reason),
2334 index: current_index,
2335 total,
2336 });
2337 }
2338 Err(e) => {
2339 let err_str = format!("{e}");
2340 if matches!(e, AppError::RateLimited { .. }) {
2341 if crate::retry::is_kill_switch_active() {
2342 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2343 } else if std::time::Instant::now() >= rate_limit_deadline {
2344 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2345 } else {
2346 let half = backoff_secs / 2;
2347 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2348 let actual_wait = half + jitter;
2349 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2350 if let Err(qe) = queue_conn.execute(
2351 "UPDATE queue SET status='pending' WHERE id=?1",
2352 rusqlite::params![queue_id],
2353 ) {
2354 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2355 }
2356 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2357 backoff_secs = (backoff_secs * 2).min(900);
2358 continue;
2359 }
2360 }
2361
2362 failed += 1;
2363 if let Err(qe) = queue_conn.execute(
2364 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2365 rusqlite::params![err_str, queue_id],
2366 ) {
2367 tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2368 }
2369 emit_json(&ItemEvent {
2370 item: &item_key,
2371 status: "failed",
2372 memory_id: None,
2373 entity_id: None,
2374 entities: None,
2375 rels: None,
2376 chars_before: None,
2377 chars_after: None,
2378 cost_usd: None,
2379 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2380 error: Some(err_str),
2381 index: current_index,
2382 total,
2383 });
2384 }
2385 }
2386
2387 let _ = item_type; }
2389 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2392 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2393
2394 emit_json(&EnrichSummary {
2395 summary: true,
2396 operation: format!("{:?}", args.operation),
2397 items_total: total,
2398 completed,
2399 failed,
2400 skipped,
2401 cost_usd: cost_total,
2402 elapsed_ms: started.elapsed().as_millis() as u64,
2403 backend_invoked: take_enrich_backend(),
2404 });
2405
2406 if failed == 0 {
2407 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2408 }
2409
2410 Ok(())
2411}
2412
2413enum EnrichItemResult {
2418 Done {
2419 memory_id: Option<i64>,
2420 entity_id: Option<i64>,
2421 entities: usize,
2422 rels: usize,
2423 chars_before: Option<usize>,
2424 chars_after: Option<usize>,
2425 cost: f64,
2426 is_oauth: bool,
2427 },
2428 Skipped {
2429 reason: String,
2430 },
2431 PreservationFailed {
2436 score: f64,
2437 threshold: f64,
2438 chars_before: usize,
2439 chars_after: usize,
2440 },
2441}
2442
2443fn call_memory_bindings(
2448 conn: &Connection,
2449 namespace: &str,
2450 memory_name: &str,
2451 binary: &Path,
2452 model: Option<&str>,
2453 timeout: u64,
2454 mode: &EnrichMode,
2455) -> Result<EnrichItemResult, AppError> {
2456 let (memory_id, body): (i64, String) = conn.query_row(
2458 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2459 rusqlite::params![namespace, memory_name],
2460 |r| Ok((r.get(0)?, r.get(1)?)),
2461 ).map_err(|e| match e {
2462 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2463 other => AppError::Database(other),
2464 })?;
2465
2466 if body.trim().is_empty() {
2467 return Ok(EnrichItemResult::Skipped {
2468 reason: "body is empty".to_string(),
2469 });
2470 }
2471
2472 let (value, cost, is_oauth) = match mode {
2473 EnrichMode::ClaudeCode => call_claude(
2474 binary,
2475 BINDINGS_PROMPT,
2476 BINDINGS_SCHEMA,
2477 &body,
2478 model,
2479 timeout,
2480 )?,
2481 EnrichMode::Codex => call_codex(
2482 binary,
2483 BINDINGS_PROMPT,
2484 BINDINGS_SCHEMA,
2485 &body,
2486 model,
2487 timeout,
2488 )?,
2489 };
2490
2491 let empty_arr = serde_json::Value::Array(vec![]);
2492 let entities_val = value.get("entities").unwrap_or(&empty_arr);
2493 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2494
2495 let (ent_count, rel_count) =
2496 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2497
2498 Ok(EnrichItemResult::Done {
2499 memory_id: Some(memory_id),
2500 entity_id: None,
2501 entities: ent_count,
2502 rels: rel_count,
2503 chars_before: None,
2504 chars_after: None,
2505 cost,
2506 is_oauth,
2507 })
2508}
2509
2510fn call_entity_description(
2511 conn: &Connection,
2512 namespace: &str,
2513 entity_name: &str,
2514 binary: &Path,
2515 model: Option<&str>,
2516 timeout: u64,
2517 mode: &EnrichMode,
2518) -> Result<EnrichItemResult, AppError> {
2519 let (entity_id, entity_type): (i64, String) = conn
2520 .query_row(
2521 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2522 rusqlite::params![namespace, entity_name],
2523 |r| Ok((r.get(0)?, r.get(1)?)),
2524 )
2525 .map_err(|e| match e {
2526 rusqlite::Error::QueryReturnedNoRows => {
2527 AppError::NotFound(format!("entity '{entity_name}' not found"))
2528 }
2529 other => AppError::Database(other),
2530 })?;
2531
2532 let prompt = format!(
2533 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2534 );
2535
2536 let (value, cost, is_oauth) = match mode {
2537 EnrichMode::ClaudeCode => call_claude(
2538 binary,
2539 &prompt,
2540 ENTITY_DESCRIPTION_SCHEMA,
2541 "",
2542 model,
2543 timeout,
2544 )?,
2545 EnrichMode::Codex => call_codex(
2546 binary,
2547 &prompt,
2548 ENTITY_DESCRIPTION_SCHEMA,
2549 "",
2550 model,
2551 timeout,
2552 )?,
2553 };
2554
2555 let description = value
2556 .get("description")
2557 .and_then(|v| v.as_str())
2558 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2559
2560 persist_entity_description(conn, entity_id, description)?;
2561
2562 Ok(EnrichItemResult::Done {
2563 memory_id: None,
2564 entity_id: Some(entity_id),
2565 entities: 0,
2566 rels: 0,
2567 chars_before: None,
2568 chars_after: None,
2569 cost,
2570 is_oauth,
2571 })
2572}
2573
2574#[allow(clippy::too_many_arguments)]
2575fn call_body_enrich(
2576 conn: &Connection,
2577 namespace: &str,
2578 memory_name: &str,
2579 binary: &Path,
2580 model: Option<&str>,
2581 timeout: u64,
2582 mode: &EnrichMode,
2583 min_output_chars: usize,
2584 max_output_chars: usize,
2585 prompt_template: Option<&Path>,
2586 preserve_threshold: f64,
2587 paths: &crate::paths::AppPaths,
2588 llm_backend: crate::cli::LlmBackendChoice,
2589) -> Result<EnrichItemResult, AppError> {
2590 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2591 .query_row(
2592 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2593 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2594 rusqlite::params![namespace, memory_name],
2595 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2596 )
2597 .map_err(|e| match e {
2598 rusqlite::Error::QueryReturnedNoRows => {
2599 AppError::NotFound(format!("memory '{memory_name}' not found"))
2600 }
2601 other => AppError::Database(other),
2602 })?;
2603
2604 let chars_before = body.chars().count();
2605
2606 let linked_entities: Vec<String> = {
2608 let mut stmt = conn.prepare_cached(
2609 "SELECT e.name FROM memory_entities me \
2610 JOIN entities e ON e.id = me.entity_id \
2611 WHERE me.memory_id = ?1 LIMIT 10",
2612 )?;
2613 let result: Vec<String> = stmt
2614 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2615 .filter_map(|r| r.ok())
2616 .collect();
2617 drop(stmt);
2618 result
2619 };
2620
2621 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2623 let file_size = std::fs::metadata(tmpl_path)
2624 .map_err(|e| {
2625 AppError::Io(std::io::Error::new(
2626 e.kind(),
2627 format!("failed to stat prompt template: {e}"),
2628 ))
2629 })?
2630 .len();
2631 if file_size > MAX_MEMORY_BODY_LEN as u64 {
2632 return Err(AppError::LimitExceeded(
2633 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2634 ));
2635 }
2636 std::fs::read_to_string(tmpl_path).map_err(|e| {
2637 AppError::Io(std::io::Error::new(
2638 e.kind(),
2639 format!("failed to read prompt template: {e}"),
2640 ))
2641 })?
2642 } else {
2643 BODY_ENRICH_PROMPT_PREFIX.to_string()
2644 };
2645
2646 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2648 let mut ctx = String::new();
2649 ctx.push_str(&format!(
2650 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2651 ));
2652 if !description.is_empty() {
2653 ctx.push_str(&format!("- Description: {description}\n"));
2654 }
2655 ctx.push_str(&format!("- Domain: {namespace}\n"));
2656 if !linked_entities.is_empty() {
2657 ctx.push_str(&format!(
2658 "- Linked entities: {}\n",
2659 linked_entities.join(", ")
2660 ));
2661 }
2662 ctx
2663 } else {
2664 String::new()
2665 };
2666
2667 let prompt = format!(
2668 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2669 );
2670
2671 let (value, cost, is_oauth) = match mode {
2673 EnrichMode::ClaudeCode => {
2674 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2675 }
2676 EnrichMode::Codex => {
2677 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2678 }
2679 };
2680
2681 let enriched_body = value
2682 .get("enriched_body")
2683 .and_then(|v| v.as_str())
2684 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2685
2686 let chars_after = enriched_body.chars().count();
2687
2688 let threshold = preserve_threshold;
2695 let verdict =
2696 crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2697 if !verdict.is_accepted() {
2698 return Ok(EnrichItemResult::PreservationFailed {
2699 score: match verdict {
2700 crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2701 crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2702 crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2703 },
2704 threshold,
2705 chars_before,
2706 chars_after,
2707 });
2708 }
2709
2710 let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2716 let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2717 if old_hash == new_hash {
2718 return Ok(EnrichItemResult::Skipped {
2719 reason: format!(
2720 "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2721 ),
2722 });
2723 }
2724
2725 if chars_after <= chars_before {
2727 return Ok(EnrichItemResult::Skipped {
2728 reason: format!(
2729 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2730 ),
2731 });
2732 }
2733
2734 persist_enriched_body(
2735 conn,
2736 namespace,
2737 memory_id,
2738 memory_name,
2739 enriched_body,
2740 paths,
2741 llm_backend,
2742 )?;
2743
2744 Ok(EnrichItemResult::Done {
2745 memory_id: Some(memory_id),
2746 entity_id: None,
2747 entities: 0,
2748 rels: 0,
2749 chars_before: Some(chars_before),
2750 chars_after: Some(chars_after),
2751 cost,
2752 is_oauth,
2753 })
2754}
2755
2756fn call_reembed(
2757 conn: &Connection,
2758 namespace: &str,
2759 memory_name: &str,
2760 paths: &crate::paths::AppPaths,
2761 llm_backend: crate::cli::LlmBackendChoice,
2762) -> Result<EnrichItemResult, AppError> {
2763 let (memory_id, body, memory_type): (i64, String, String) = conn
2764 .query_row(
2765 "SELECT id, COALESCE(body,''), COALESCE(type,'note')
2766 FROM memories
2767 WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2768 rusqlite::params![namespace, memory_name],
2769 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2770 )
2771 .map_err(|e| match e {
2772 rusqlite::Error::QueryReturnedNoRows => {
2773 AppError::NotFound(format!("memory '{memory_name}' not found"))
2774 }
2775 other => AppError::Database(other),
2776 })?;
2777
2778 if body.trim().is_empty() {
2779 return Ok(EnrichItemResult::Skipped {
2780 reason: "body is empty".to_string(),
2781 });
2782 }
2783
2784 reembed_memory_vector(
2785 conn,
2786 namespace,
2787 memory_id,
2788 memory_name,
2789 &memory_type,
2790 &body,
2791 paths,
2792 llm_backend,
2793 )?;
2794
2795 Ok(EnrichItemResult::Done {
2796 memory_id: Some(memory_id),
2797 entity_id: None,
2798 entities: 0,
2799 rels: 0,
2800 chars_before: Some(body.chars().count()),
2801 chars_after: Some(body.chars().count()),
2802 cost: 0.0,
2803 is_oauth: true,
2804 })
2805}
2806
2807fn scan_operation(
2812 conn: &Connection,
2813 namespace: &str,
2814 args: &EnrichArgs,
2815) -> Result<Vec<String>, AppError> {
2816 let name_filter = resolve_name_filter(args)?;
2818 match args.operation {
2819 EnrichOperation::MemoryBindings => {
2820 let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
2821 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2822 }
2823 EnrichOperation::EntityDescriptions => {
2824 let rows = scan_entities_without_description(conn, namespace, args.limit)?;
2825 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2826 }
2827 EnrichOperation::BodyEnrich => {
2828 let rows =
2829 scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
2830 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2831 }
2832 EnrichOperation::ReEmbed => {
2833 let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
2834 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2835 }
2836 EnrichOperation::WeightCalibrate => {
2837 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
2838 Ok(rows
2839 .into_iter()
2840 .map(|(id, _, _, _, _)| id.to_string())
2841 .collect())
2842 }
2843 EnrichOperation::RelationReclassify => {
2844 let rows = scan_generic_relations(conn, namespace, args.limit)?;
2845 Ok(rows
2846 .into_iter()
2847 .map(|(id, _, _, _)| id.to_string())
2848 .collect())
2849 }
2850 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2851 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
2852 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
2853 }
2854 EnrichOperation::EntityTypeValidate => {
2855 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
2856 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2857 }
2858 EnrichOperation::DescriptionEnrich => {
2859 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2860 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2861 }
2862 EnrichOperation::DomainClassify
2863 | EnrichOperation::GraphAudit
2864 | EnrichOperation::DeepResearchSynth
2865 | EnrichOperation::BodyExtract => {
2866 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2867 let sql = format!(
2868 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2869 );
2870 let mut stmt = conn.prepare(&sql)?;
2871 let names = stmt
2872 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2873 .collect::<Result<Vec<_>, _>>()?;
2874 Ok(names)
2875 }
2876 }
2877}
2878
2879fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2885 if let Some(p) = explicit {
2886 if p.exists() {
2887 return Ok(p.to_path_buf());
2888 }
2889 return Err(AppError::Validation(format!(
2890 "Codex binary not found at explicit path: {}",
2891 p.display()
2892 )));
2893 }
2894
2895 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2896 let p = PathBuf::from(&env_path);
2897 if p.exists() {
2898 return Ok(p);
2899 }
2900 }
2901
2902 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2903 if let Some(path_var) = std::env::var_os("PATH") {
2904 for dir in std::env::split_paths(&path_var) {
2905 let candidate = dir.join(name);
2906 if candidate.exists() {
2907 return Ok(crate::extract::llm_embedding::resolve_real_binary(
2908 &candidate,
2909 ));
2910 }
2911 }
2912 }
2913
2914 Err(AppError::Validation(
2915 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2916 ))
2917}
2918
2919fn call_weight_calibrate(
2921 conn: &Connection,
2922 _namespace: &str,
2923 item_key: &str,
2924 binary: &Path,
2925 model: Option<&str>,
2926 timeout: u64,
2927 mode: &EnrichMode,
2928) -> Result<EnrichItemResult, AppError> {
2929 let rel_id: i64 = item_key
2930 .parse()
2931 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2932 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2933 .query_row(
2934 "SELECT e1.name, e2.name, r.relation, r.weight \
2935 FROM relationships r \
2936 JOIN entities e1 ON e1.id = r.source_id \
2937 JOIN entities e2 ON e2.id = r.target_id \
2938 WHERE r.id = ?1",
2939 rusqlite::params![rel_id],
2940 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2941 )
2942 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2943
2944 let input_text = format!(
2945 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2946 );
2947 let (value, cost, is_oauth) = match mode {
2948 EnrichMode::ClaudeCode => call_claude(
2949 binary,
2950 WEIGHT_CALIBRATE_PROMPT,
2951 WEIGHT_CALIBRATE_SCHEMA,
2952 &input_text,
2953 model,
2954 timeout,
2955 )?,
2956 EnrichMode::Codex => call_codex(
2957 binary,
2958 WEIGHT_CALIBRATE_PROMPT,
2959 WEIGHT_CALIBRATE_SCHEMA,
2960 &input_text,
2961 model,
2962 timeout,
2963 )?,
2964 };
2965
2966 let calibrated = value
2967 .get("calibrated_weight")
2968 .and_then(|v| v.as_f64())
2969 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2970
2971 conn.execute(
2972 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2973 rusqlite::params![calibrated, rel_id],
2974 )?;
2975
2976 Ok(EnrichItemResult::Done {
2977 memory_id: None,
2978 entity_id: None,
2979 entities: 0,
2980 rels: 1,
2981 chars_before: None,
2982 chars_after: None,
2983 cost,
2984 is_oauth,
2985 })
2986}
2987
2988fn call_relation_reclassify(
2990 conn: &Connection,
2991 _namespace: &str,
2992 item_key: &str,
2993 binary: &Path,
2994 model: Option<&str>,
2995 timeout: u64,
2996 mode: &EnrichMode,
2997) -> Result<EnrichItemResult, AppError> {
2998 let rel_id: i64 = item_key
2999 .parse()
3000 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3001 let (source_name, target_name, current_relation): (String, String, String) = conn
3002 .query_row(
3003 "SELECT e1.name, e2.name, r.relation \
3004 FROM relationships r \
3005 JOIN entities e1 ON e1.id = r.source_id \
3006 JOIN entities e2 ON e2.id = r.target_id \
3007 WHERE r.id = ?1",
3008 rusqlite::params![rel_id],
3009 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3010 )
3011 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3012
3013 let input_text = format!(
3014 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3015 );
3016 let (value, cost, is_oauth) = match mode {
3017 EnrichMode::ClaudeCode => call_claude(
3018 binary,
3019 RELATION_RECLASSIFY_PROMPT,
3020 RELATION_RECLASSIFY_SCHEMA,
3021 &input_text,
3022 model,
3023 timeout,
3024 )?,
3025 EnrichMode::Codex => call_codex(
3026 binary,
3027 RELATION_RECLASSIFY_PROMPT,
3028 RELATION_RECLASSIFY_SCHEMA,
3029 &input_text,
3030 model,
3031 timeout,
3032 )?,
3033 };
3034
3035 let new_relation = value
3036 .get("relation")
3037 .and_then(|v| v.as_str())
3038 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3039 let new_strength = value
3040 .get("strength")
3041 .and_then(|v| v.as_f64())
3042 .unwrap_or(0.5);
3043
3044 conn.execute(
3045 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3046 rusqlite::params![new_relation, new_strength, rel_id],
3047 )?;
3048
3049 Ok(EnrichItemResult::Done {
3050 memory_id: None,
3051 entity_id: None,
3052 entities: 0,
3053 rels: 1,
3054 chars_before: None,
3055 chars_after: None,
3056 cost,
3057 is_oauth,
3058 })
3059}
3060
3061fn call_entity_connect(
3063 conn: &Connection,
3064 namespace: &str,
3065 item_key: &str,
3066 binary: &Path,
3067 model: Option<&str>,
3068 timeout: u64,
3069 mode: &EnrichMode,
3070) -> Result<EnrichItemResult, AppError> {
3071 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3072 let (e1_id, e1_name, e2_id, e2_name) =
3073 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3074 Some(p) => p,
3075 None => {
3076 return Ok(EnrichItemResult::Skipped {
3077 reason: "pair no longer isolated".into(),
3078 })
3079 }
3080 };
3081 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3082 let (value, cost, is_oauth) = match mode {
3083 EnrichMode::ClaudeCode => call_claude(
3084 binary,
3085 ENTITY_CONNECT_PROMPT,
3086 ENTITY_CONNECT_SCHEMA,
3087 &input_text,
3088 model,
3089 timeout,
3090 )?,
3091 EnrichMode::Codex => call_codex(
3092 binary,
3093 ENTITY_CONNECT_PROMPT,
3094 ENTITY_CONNECT_SCHEMA,
3095 &input_text,
3096 model,
3097 timeout,
3098 )?,
3099 };
3100 let relation = value
3101 .get("relation")
3102 .and_then(|v| v.as_str())
3103 .unwrap_or("none");
3104 if relation == "none" {
3105 return Ok(EnrichItemResult::Skipped {
3106 reason: "LLM determined no relationship".into(),
3107 });
3108 }
3109 let strength = value
3110 .get("strength")
3111 .and_then(|v| v.as_f64())
3112 .unwrap_or(0.5);
3113 conn.execute(
3114 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3115 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3116 )?;
3117 Ok(EnrichItemResult::Done {
3118 memory_id: None,
3119 entity_id: None,
3120 entities: 0,
3121 rels: 1,
3122 chars_before: None,
3123 chars_after: None,
3124 cost,
3125 is_oauth,
3126 })
3127}
3128
3129fn call_entity_type_validate(
3131 conn: &Connection,
3132 _namespace: &str,
3133 item_key: &str,
3134 binary: &Path,
3135 model: Option<&str>,
3136 timeout: u64,
3137 mode: &EnrichMode,
3138) -> Result<EnrichItemResult, AppError> {
3139 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3140 .query_row(
3141 "SELECT id, name, type FROM entities WHERE name = ?1",
3142 rusqlite::params![item_key],
3143 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3144 )
3145 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3146 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3147 let (value, cost, is_oauth) = match mode {
3148 EnrichMode::ClaudeCode => call_claude(
3149 binary,
3150 ENTITY_TYPE_VALIDATE_PROMPT,
3151 ENTITY_TYPE_VALIDATE_SCHEMA,
3152 &input_text,
3153 model,
3154 timeout,
3155 )?,
3156 EnrichMode::Codex => call_codex(
3157 binary,
3158 ENTITY_TYPE_VALIDATE_PROMPT,
3159 ENTITY_TYPE_VALIDATE_SCHEMA,
3160 &input_text,
3161 model,
3162 timeout,
3163 )?,
3164 };
3165 let validated_type = value
3166 .get("validated_type")
3167 .and_then(|v| v.as_str())
3168 .unwrap_or(&ent_type);
3169 let was_correct = value
3170 .get("was_correct")
3171 .and_then(|v| v.as_bool())
3172 .unwrap_or(true);
3173 if !was_correct {
3174 conn.execute(
3175 "UPDATE entities SET type = ?1 WHERE id = ?2",
3176 rusqlite::params![validated_type, ent_id],
3177 )?;
3178 }
3179 Ok(EnrichItemResult::Done {
3180 memory_id: None,
3181 entity_id: Some(ent_id),
3182 entities: 1,
3183 rels: 0,
3184 chars_before: None,
3185 chars_after: None,
3186 cost,
3187 is_oauth,
3188 })
3189}
3190
3191fn call_description_enrich(
3193 conn: &Connection,
3194 _namespace: &str,
3195 item_key: &str,
3196 binary: &Path,
3197 model: Option<&str>,
3198 timeout: u64,
3199 mode: &EnrichMode,
3200) -> Result<EnrichItemResult, AppError> {
3201 let (mem_id, body, old_desc): (i64, String, String) = conn
3202 .query_row(
3203 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3204 rusqlite::params![item_key],
3205 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3206 )
3207 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3208 let snippet: String = body.chars().take(500).collect();
3209 let input_text = format!(
3210 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3211 );
3212 let (value, cost, is_oauth) = match mode {
3213 EnrichMode::ClaudeCode => call_claude(
3214 binary,
3215 DESCRIPTION_ENRICH_PROMPT,
3216 DESCRIPTION_ENRICH_SCHEMA,
3217 &input_text,
3218 model,
3219 timeout,
3220 )?,
3221 EnrichMode::Codex => call_codex(
3222 binary,
3223 DESCRIPTION_ENRICH_PROMPT,
3224 DESCRIPTION_ENRICH_SCHEMA,
3225 &input_text,
3226 model,
3227 timeout,
3228 )?,
3229 };
3230 let new_desc = value
3231 .get("description")
3232 .and_then(|v| v.as_str())
3233 .unwrap_or(&old_desc);
3234 conn.execute(
3235 "UPDATE memories SET description = ?1 WHERE id = ?2",
3236 rusqlite::params![new_desc, mem_id],
3237 )?;
3238 Ok(EnrichItemResult::Done {
3239 memory_id: Some(mem_id),
3240 entity_id: None,
3241 entities: 0,
3242 rels: 0,
3243 chars_before: Some(old_desc.len()),
3244 chars_after: Some(new_desc.len()),
3245 cost,
3246 is_oauth,
3247 })
3248}
3249
3250fn call_domain_classify(
3252 conn: &Connection,
3253 _namespace: &str,
3254 item_key: &str,
3255 binary: &Path,
3256 model: Option<&str>,
3257 timeout: u64,
3258 mode: &EnrichMode,
3259) -> Result<EnrichItemResult, AppError> {
3260 let (mem_id, body, desc): (i64, String, String) = conn
3261 .query_row(
3262 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3263 rusqlite::params![item_key],
3264 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3265 )
3266 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3267 let snippet: String = body.chars().take(500).collect();
3268 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3269 let (value, cost, is_oauth) = match mode {
3270 EnrichMode::ClaudeCode => call_claude(
3271 binary,
3272 DOMAIN_CLASSIFY_PROMPT,
3273 DOMAIN_CLASSIFY_SCHEMA,
3274 &input_text,
3275 model,
3276 timeout,
3277 )?,
3278 EnrichMode::Codex => call_codex(
3279 binary,
3280 DOMAIN_CLASSIFY_PROMPT,
3281 DOMAIN_CLASSIFY_SCHEMA,
3282 &input_text,
3283 model,
3284 timeout,
3285 )?,
3286 };
3287 let domain = value
3288 .get("domain")
3289 .and_then(|v| v.as_str())
3290 .unwrap_or("uncategorized");
3291 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3292 conn.execute(
3293 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3294 rusqlite::params![metadata, mem_id],
3295 )?;
3296 Ok(EnrichItemResult::Done {
3297 memory_id: Some(mem_id),
3298 entity_id: None,
3299 entities: 0,
3300 rels: 0,
3301 chars_before: None,
3302 chars_after: None,
3303 cost,
3304 is_oauth,
3305 })
3306}
3307
3308fn call_graph_audit(
3310 conn: &Connection,
3311 _namespace: &str,
3312 item_key: &str,
3313 binary: &Path,
3314 model: Option<&str>,
3315 timeout: u64,
3316 mode: &EnrichMode,
3317) -> Result<EnrichItemResult, AppError> {
3318 let (mem_id, body, desc): (i64, String, String) = conn
3319 .query_row(
3320 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3321 rusqlite::params![item_key],
3322 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3323 )
3324 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3325 let snippet: String = body.chars().take(500).collect();
3326 let ent_count: i64 = conn
3327 .query_row(
3328 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3329 rusqlite::params![mem_id],
3330 |r| r.get(0),
3331 )
3332 .unwrap_or(0);
3333 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3334 let (value, cost, is_oauth) = match mode {
3335 EnrichMode::ClaudeCode => call_claude(
3336 binary,
3337 GRAPH_AUDIT_PROMPT,
3338 GRAPH_AUDIT_SCHEMA,
3339 &input_text,
3340 model,
3341 timeout,
3342 )?,
3343 EnrichMode::Codex => call_codex(
3344 binary,
3345 GRAPH_AUDIT_PROMPT,
3346 GRAPH_AUDIT_SCHEMA,
3347 &input_text,
3348 model,
3349 timeout,
3350 )?,
3351 };
3352 let issues = value
3353 .get("issues")
3354 .and_then(|v| v.as_array())
3355 .map(|a| a.len())
3356 .unwrap_or(0);
3357 Ok(EnrichItemResult::Done {
3358 memory_id: Some(mem_id),
3359 entity_id: None,
3360 entities: 0,
3361 rels: issues,
3362 chars_before: None,
3363 chars_after: None,
3364 cost,
3365 is_oauth,
3366 })
3367}
3368
3369fn call_deep_research_synth(
3371 conn: &Connection,
3372 namespace: &str,
3373 item_key: &str,
3374 binary: &Path,
3375 model: Option<&str>,
3376 timeout: u64,
3377 mode: &EnrichMode,
3378) -> Result<EnrichItemResult, AppError> {
3379 let (mem_id, body): (i64, String) = conn
3380 .query_row(
3381 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3382 rusqlite::params![item_key],
3383 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3384 )
3385 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3386 let snippet: String = body.chars().take(2000).collect();
3387 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3388 let (value, cost, is_oauth) = match mode {
3389 EnrichMode::ClaudeCode => call_claude(
3390 binary,
3391 DEEP_RESEARCH_SYNTH_PROMPT,
3392 DEEP_RESEARCH_SYNTH_SCHEMA,
3393 &input_text,
3394 model,
3395 timeout,
3396 )?,
3397 EnrichMode::Codex => call_codex(
3398 binary,
3399 DEEP_RESEARCH_SYNTH_PROMPT,
3400 DEEP_RESEARCH_SYNTH_SCHEMA,
3401 &input_text,
3402 model,
3403 timeout,
3404 )?,
3405 };
3406 let mut ent_count = 0usize;
3407 let mut rel_count = 0usize;
3408 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3409 for e in ents {
3410 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3411 let etype_str = e
3412 .get("entity_type")
3413 .and_then(|v| v.as_str())
3414 .unwrap_or("concept");
3415 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3416 if name.len() >= 2 {
3417 let ne = NewEntity {
3418 name: name.to_string(),
3419 entity_type: etype,
3420 description: None,
3421 };
3422 let _ = entities::upsert_entity(conn, namespace, &ne);
3423 ent_count += 1;
3424 }
3425 }
3426 }
3427 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3428 for r in rels {
3429 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3430 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3431 if src.is_empty() || tgt.is_empty() {
3432 continue;
3433 }
3434 let rel = r
3435 .get("relation")
3436 .and_then(|v| v.as_str())
3437 .unwrap_or("related");
3438 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3439 if let (Some(sid), Some(tid)) = (
3440 entities::find_entity_id(conn, namespace, src)?,
3441 entities::find_entity_id(conn, namespace, tgt)?,
3442 ) {
3443 let _ = entities::create_or_fetch_relationship(
3444 conn, namespace, sid, tid, rel, str_, None,
3445 );
3446 rel_count += 1;
3447 }
3448 }
3449 }
3450 Ok(EnrichItemResult::Done {
3451 memory_id: Some(mem_id),
3452 entity_id: None,
3453 entities: ent_count,
3454 rels: rel_count,
3455 chars_before: None,
3456 chars_after: None,
3457 cost,
3458 is_oauth,
3459 })
3460}
3461
3462fn call_body_extract(
3464 conn: &Connection,
3465 _namespace: &str,
3466 item_key: &str,
3467 binary: &Path,
3468 model: Option<&str>,
3469 timeout: u64,
3470 mode: &EnrichMode,
3471) -> Result<EnrichItemResult, AppError> {
3472 let (mem_id, body): (i64, String) = conn
3473 .query_row(
3474 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3475 rusqlite::params![item_key],
3476 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3477 )
3478 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3479 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3480 let (value, cost, is_oauth) = match mode {
3481 EnrichMode::ClaudeCode => call_claude(
3482 binary,
3483 BODY_EXTRACT_PROMPT,
3484 BODY_EXTRACT_SCHEMA,
3485 &input_text,
3486 model,
3487 timeout,
3488 )?,
3489 EnrichMode::Codex => call_codex(
3490 binary,
3491 BODY_EXTRACT_PROMPT,
3492 BODY_EXTRACT_SCHEMA,
3493 &input_text,
3494 model,
3495 timeout,
3496 )?,
3497 };
3498 let restructured = value
3499 .get("restructured_body")
3500 .and_then(|v| v.as_str())
3501 .unwrap_or(&body);
3502 let chars_before = body.len();
3503 let chars_after = restructured.len();
3504 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3505 conn.execute(
3506 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3507 rusqlite::params![restructured, new_hash, mem_id],
3508 )?;
3509 Ok(EnrichItemResult::Done {
3510 memory_id: Some(mem_id),
3511 entity_id: None,
3512 entities: 0,
3513 rels: 0,
3514 chars_before: Some(chars_before),
3515 chars_after: Some(chars_after),
3516 cost,
3517 is_oauth,
3518 })
3519}
3520
3521#[allow(clippy::type_complexity)]
3523fn scan_isolated_entity_pairs(
3524 conn: &Connection,
3525 namespace: &str,
3526 limit: Option<usize>,
3527) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3528 let limit_val = limit.unwrap_or(50) as i64;
3529 let mut stmt = conn.prepare_cached(
3530 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3531 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3532 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3533 (r.source_id = e1.id AND r.target_id = e2.id) OR \
3534 (r.source_id = e2.id AND r.target_id = e1.id)) \
3535 LIMIT ?2",
3536 )?;
3537 let rows = stmt
3538 .query_map(rusqlite::params![namespace, limit_val], |r| {
3539 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3540 })?
3541 .collect::<Result<Vec<_>, _>>()?;
3542 Ok(rows)
3543}
3544
3545fn scan_entities_for_type_validation(
3547 conn: &Connection,
3548 namespace: &str,
3549 limit: Option<usize>,
3550) -> Result<Vec<(i64, String, String)>, AppError> {
3551 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3552 let sql = format!(
3553 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3554 );
3555 let mut stmt = conn.prepare(&sql)?;
3556 let rows = stmt
3557 .query_map(rusqlite::params![namespace], |r| {
3558 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3559 })?
3560 .collect::<Result<Vec<_>, _>>()?;
3561 Ok(rows)
3562}
3563
3564fn scan_generic_descriptions(
3566 conn: &Connection,
3567 namespace: &str,
3568 limit: Option<usize>,
3569) -> Result<Vec<(i64, String, String)>, AppError> {
3570 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3571 let sql = format!(
3572 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3573 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3574 ORDER BY id {limit_clause}"
3575 );
3576 let mut stmt = conn.prepare(&sql)?;
3577 let rows = stmt
3578 .query_map(rusqlite::params![namespace], |r| {
3579 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3580 })?
3581 .collect::<Result<Vec<_>, _>>()?;
3582 Ok(rows)
3583}
3584
3585fn call_codex(
3589 binary: &Path,
3590 prompt: &str,
3591 json_schema: &str,
3592 input_text: &str,
3593 model: Option<&str>,
3594 timeout_secs: u64,
3595) -> Result<(serde_json::Value, f64, bool), AppError> {
3596 use wait_timeout::ChildExt;
3597
3598 super::codex_spawn::validate_codex_model(model)?;
3603 let schema_file = super::codex_spawn::trusted_schema_path()?;
3604
3605 let args = super::codex_spawn::CodexSpawnArgs {
3606 binary,
3607 prompt,
3608 json_schema,
3609 input_text,
3610 model,
3611 timeout_secs,
3612 schema_path: schema_file.clone(),
3613 };
3614 let mut cmd = super::codex_spawn::build_codex_command(&args);
3615
3616 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3617 AppError::Io(std::io::Error::new(
3618 e.kind(),
3619 format!("failed to spawn codex: {e}"),
3620 ))
3621 })?;
3622
3623 let full_prompt = format!("{prompt}\n\n{input_text}");
3624 let stdin_bytes = full_prompt.into_bytes();
3625 let mut child_stdin = child
3626 .stdin
3627 .take()
3628 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3629 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3630 child_stdin.write_all(&stdin_bytes)?;
3631 drop(child_stdin);
3632 Ok(())
3633 });
3634
3635 let start = std::time::Instant::now();
3636 let timeout = std::time::Duration::from_secs(timeout_secs);
3637 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3638 let _ = std::fs::remove_file(&schema_file);
3639
3640 match status {
3641 Some(exit_status) => {
3642 stdin_thread
3643 .join()
3644 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3645 .map_err(AppError::Io)?;
3646
3647 tracing::debug!(
3648 target: "process",
3649 exit_code = ?exit_status.code(),
3650 elapsed_ms = start.elapsed().as_millis() as u64,
3651 "external process completed"
3652 );
3653
3654 let mut stdout_buf = Vec::new();
3655 if let Some(mut out) = child.stdout.take() {
3656 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3657 }
3658 if !exit_status.success() {
3659 let mut stderr_buf = Vec::new();
3660 if let Some(mut err) = child.stderr.take() {
3661 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3662 }
3663 let stderr_str = String::from_utf8_lossy(&stderr_buf);
3664 tracing::warn!(
3665 target: "enrich",
3666 exit_code = ?exit_status.code(),
3667 stderr = %stderr_str.trim(),
3668 "codex process failed"
3669 );
3670 return Err(AppError::Validation(format!(
3671 "codex exited with code {:?}: {}",
3672 exit_status.code(),
3673 stderr_str.trim()
3674 )));
3675 }
3676 let stdout_str = String::from_utf8(stdout_buf)
3677 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3678 let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
3681 let value: serde_json::Value =
3687 serde_json::from_str(&result.last_agent_text).map_err(|e| {
3688 AppError::Validation(format!(
3689 "codex agent_message is not valid JSON: {e}; raw={}",
3690 result.last_agent_text
3691 ))
3692 })?;
3693 Ok((value, 0.0, false))
3694 }
3695 None => {
3696 let _ = child.kill();
3697 let _ = child.wait();
3698 let _ = stdin_thread.join();
3699 Err(AppError::Validation(format!(
3700 "codex timed out after {timeout_secs} seconds"
3701 )))
3702 }
3703 }
3704}
3705
3706#[cfg(test)]
3711mod tests {
3712 use super::*;
3713 use rusqlite::Connection;
3714 #[cfg(unix)]
3715 use std::os::unix::fs::PermissionsExt;
3716
3717 fn open_test_db() -> Connection {
3719 let conn = Connection::open_in_memory().expect("in-memory db");
3720 conn.execute_batch(
3721 "CREATE TABLE memories (
3722 id INTEGER PRIMARY KEY AUTOINCREMENT,
3723 namespace TEXT NOT NULL DEFAULT 'global',
3724 name TEXT NOT NULL,
3725 type TEXT NOT NULL DEFAULT 'note',
3726 description TEXT NOT NULL DEFAULT '',
3727 body TEXT NOT NULL DEFAULT '',
3728 body_hash TEXT NOT NULL DEFAULT '',
3729 session_id TEXT,
3730 source TEXT NOT NULL DEFAULT 'agent',
3731 metadata TEXT NOT NULL DEFAULT '{}',
3732 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3733 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3734 deleted_at INTEGER,
3735 UNIQUE(namespace, name)
3736 );
3737 CREATE TABLE entities (
3738 id INTEGER PRIMARY KEY AUTOINCREMENT,
3739 namespace TEXT NOT NULL DEFAULT 'global',
3740 name TEXT NOT NULL,
3741 type TEXT NOT NULL DEFAULT 'concept',
3742 description TEXT,
3743 degree INTEGER NOT NULL DEFAULT 0,
3744 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3745 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3746 UNIQUE(namespace, name)
3747 );
3748 CREATE TABLE memory_entities (
3749 memory_id INTEGER NOT NULL,
3750 entity_id INTEGER NOT NULL,
3751 PRIMARY KEY (memory_id, entity_id)
3752 );
3753 CREATE TABLE relationships (
3754 id INTEGER PRIMARY KEY AUTOINCREMENT,
3755 namespace TEXT NOT NULL DEFAULT 'global',
3756 source_id INTEGER NOT NULL,
3757 target_id INTEGER NOT NULL,
3758 relation TEXT NOT NULL,
3759 weight REAL NOT NULL DEFAULT 0.5,
3760 description TEXT,
3761 UNIQUE(source_id, target_id, relation)
3762 );
3763 CREATE TABLE memory_embeddings (
3764 memory_id INTEGER PRIMARY KEY,
3765 namespace TEXT NOT NULL,
3766 embedding BLOB NOT NULL,
3767 source TEXT NOT NULL,
3768 model TEXT NOT NULL DEFAULT '',
3769 dim INTEGER NOT NULL DEFAULT 384,
3770 created_at INTEGER NOT NULL DEFAULT (unixepoch())
3771 );",
3772 )
3773 .expect("schema creation must succeed");
3774 conn
3775 }
3776
3777 #[test]
3778 fn scan_unbound_memories_finds_memories_without_bindings() {
3779 let conn = open_test_db();
3780 conn.execute(
3781 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
3782 [],
3783 )
3784 .unwrap();
3785
3786 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3787 assert_eq!(results.len(), 1);
3788 assert_eq!(results[0].1, "test-mem");
3789 }
3790
3791 #[test]
3792 fn scan_unbound_memories_excludes_bound_memories() {
3793 let conn = open_test_db();
3794 conn.execute(
3795 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
3796 [],
3797 )
3798 .unwrap();
3799 let mem_id: i64 = conn
3800 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
3801 r.get(0)
3802 })
3803 .unwrap();
3804 conn.execute(
3805 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
3806 [],
3807 )
3808 .unwrap();
3809 let ent_id: i64 = conn
3810 .query_row(
3811 "SELECT id FROM entities WHERE name='some-entity'",
3812 [],
3813 |r| r.get(0),
3814 )
3815 .unwrap();
3816 conn.execute(
3817 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
3818 rusqlite::params![mem_id, ent_id],
3819 )
3820 .unwrap();
3821
3822 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3823 assert!(results.is_empty(), "bound memory must not appear in scan");
3824 }
3825
3826 #[test]
3827 fn scan_entities_without_description_finds_null_description() {
3828 let conn = open_test_db();
3829 conn.execute(
3830 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
3831 [],
3832 )
3833 .unwrap();
3834
3835 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3836 assert_eq!(results.len(), 1);
3837 assert_eq!(results[0].1, "my-tool");
3838 }
3839
3840 #[test]
3841 fn scan_entities_without_description_excludes_entities_with_description() {
3842 let conn = open_test_db();
3843 conn.execute(
3844 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
3845 [],
3846 )
3847 .unwrap();
3848
3849 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3850 assert!(
3851 results.is_empty(),
3852 "entity with description must not appear"
3853 );
3854 }
3855
3856 #[test]
3857 fn scan_short_body_memories_finds_short_bodies() {
3858 let conn = open_test_db();
3859 conn.execute(
3860 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3861 [],
3862 )
3863 .unwrap();
3864
3865 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3866 assert_eq!(results.len(), 1);
3867 assert_eq!(results[0].1, "short-mem");
3868 }
3869
3870 #[test]
3871 fn scan_short_body_memories_excludes_long_bodies() {
3872 let conn = open_test_db();
3873 let long_body = "a".repeat(1000);
3874 conn.execute(
3875 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3876 rusqlite::params![long_body],
3877 )
3878 .unwrap();
3879
3880 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3881 assert!(results.is_empty(), "long memory must not appear in scan");
3882 }
3883
3884 #[test]
3885 fn scan_respects_limit() {
3886 let conn = open_test_db();
3887 for i in 0..5 {
3888 conn.execute(
3889 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3890 [],
3891 )
3892 .unwrap();
3893 }
3894
3895 let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3896 assert_eq!(results.len(), 3, "limit must be respected");
3897 }
3898
3899 #[test]
3900 fn scan_memories_without_embeddings_finds_only_missing_rows() {
3901 let conn = open_test_db();
3902 conn.execute(
3903 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
3904 [],
3905 )
3906 .unwrap();
3907 conn.execute(
3908 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
3909 [],
3910 )
3911 .unwrap();
3912 let memory_id: i64 = conn
3913 .query_row(
3914 "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
3915 [],
3916 |r| r.get(0),
3917 )
3918 .unwrap();
3919 let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
3920 memories::upsert_vec(
3921 &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
3922 )
3923 .unwrap();
3924
3925 let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
3926 assert_eq!(results.len(), 1);
3927 assert_eq!(results[0].1, "missing-vec");
3928 }
3929
3930 #[test]
3931 fn scan_memories_without_embeddings_respects_name_filter() {
3932 let conn = open_test_db();
3933 conn.execute(
3934 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
3935 [],
3936 )
3937 .unwrap();
3938 conn.execute(
3939 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
3940 [],
3941 )
3942 .unwrap();
3943
3944 let results =
3945 scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
3946 .unwrap();
3947 assert_eq!(results.len(), 1);
3948 assert_eq!(results[0].1, "match-me");
3949 }
3950
3951 #[test]
3952 fn queue_db_schema_creates_correctly() {
3953 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3954 let conn = open_queue_db(&tmp_path).expect("queue db must open");
3955 let count: i64 = conn
3956 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3957 .unwrap();
3958 assert_eq!(count, 0);
3959 let _ = std::fs::remove_file(&tmp_path);
3960 }
3961
3962 #[test]
3963 fn parse_claude_output_valid_bindings() {
3964 let output = r#"[
3965 {"type":"system","subtype":"init"},
3966 {"type":"result","is_error":false,"total_cost_usd":0.01,
3967 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3968 ]"#;
3969 let result = crate::commands::claude_runner::parse_claude_output(output)
3970 .expect("must parse successfully");
3971 assert!(result.value.get("entities").is_some());
3972 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3973 assert!(!result.is_oauth);
3974 }
3975
3976 #[test]
3977 fn parse_claude_output_detects_oauth() {
3978 let output = r#"[
3979 {"type":"system","subtype":"init","apiKeySource":"none"},
3980 {"type":"result","is_error":false,"total_cost_usd":0.0,
3981 "structured_output":{"entities":[],"relationships":[]}}
3982 ]"#;
3983 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
3984 assert!(result.is_oauth);
3985 }
3986
3987 #[test]
3988 fn parse_claude_output_rate_limit_returns_error() {
3989 let output = r#"[
3990 {"type":"system","subtype":"init"},
3991 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
3992 ]"#;
3993 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3994 assert!(matches!(err, AppError::RateLimited { .. }));
3995 }
3996
3997 #[test]
3998 fn parse_claude_output_auth_error() {
3999 let output = r#"[
4000 {"type":"system","subtype":"init"},
4001 {"type":"result","is_error":true,"error":"authentication failed"}
4002 ]"#;
4003 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4004 assert!(format!("{err}").contains("authentication failed"));
4005 }
4006
4007 #[cfg(unix)]
4008 #[test]
4009 fn call_codex_returns_raw_json_for_body_enrich_schema() {
4010 let tmp = tempfile::tempdir().expect("tempdir");
4011 let binary = tmp.path().join("codex-mock");
4012 std::fs::write(
4013 &binary,
4014 r#"#!/usr/bin/env bash
4015set -euo pipefail
4016cat <<'JSONL'
4017{"type":"thread.started","thread_id":"mock-thread-0"}
4018{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4019{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4020JSONL
4021"#,
4022 )
4023 .expect("mock codex write");
4024 let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4025 perms.set_mode(0o755);
4026 std::fs::set_permissions(&binary, perms).expect("chmod");
4027
4028 let (value, cost, is_oauth) =
4029 call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4030 .expect("call_codex must accept body-enrich payload");
4031
4032 assert_eq!(value["enriched_body"], "expanded body");
4033 assert_eq!(cost, 0.0);
4034 assert!(!is_oauth);
4035 }
4036
4037 #[test]
4038 fn dry_run_emits_preview_without_calling_llm() {
4039 let conn = open_test_db();
4044 conn.execute(
4045 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4046 [],
4047 )
4048 .unwrap();
4049
4050 let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
4051 assert_eq!(results.len(), 1);
4052 assert_eq!(results[0].1, "dry-mem");
4053 }
4056
4057 #[test]
4058 fn persist_entity_description_updates_db() {
4059 let conn = open_test_db();
4060 conn.execute(
4061 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4062 [],
4063 )
4064 .unwrap();
4065 let eid: i64 = conn
4066 .query_row(
4067 "SELECT id FROM entities WHERE name='tokio-runtime'",
4068 [],
4069 |r| r.get(0),
4070 )
4071 .unwrap();
4072
4073 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4074
4075 let desc: String = conn
4076 .query_row(
4077 "SELECT description FROM entities WHERE id=?1",
4078 rusqlite::params![eid],
4079 |r| r.get(0),
4080 )
4081 .unwrap();
4082 assert_eq!(desc, "Async runtime for Rust applications");
4083 }
4084
4085 #[test]
4086 fn bindings_schema_is_valid_json() {
4087 let _: serde_json::Value =
4088 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4089 }
4090
4091 #[test]
4092 fn entity_description_schema_is_valid_json() {
4093 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4094 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4095 }
4096
4097 #[test]
4098 fn body_enrich_schema_is_valid_json() {
4099 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4100 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4101 }
4102}