1use crate::commands::ingest_claude::find_claude_binary;
28use crate::constants::MAX_MEMORY_BODY_LEN;
29use crate::entity_type::EntityType;
30use crate::errors::AppError;
31use crate::paths::AppPaths;
32use crate::storage::connection::{ensure_db_ready, open_rw};
33use crate::storage::entities::{self, NewEntity, NewRelationship};
34use crate::storage::memories;
35
36use rusqlite::Connection;
37use serde::{Deserialize, Serialize};
38use std::io::Write;
39use std::path::{Path, PathBuf};
40use std::time::Instant;
41
42const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
47const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
48const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
49const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
50
51const BINDINGS_SCHEMA: &str = r#"{
56 "type": "object",
57 "properties": {
58 "entities": {
59 "type": "array",
60 "items": {
61 "type": "object",
62 "properties": {
63 "name": { "type": "string" },
64 "entity_type": {
65 "type": "string",
66 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
67 }
68 },
69 "required": ["name", "entity_type"],
70 "additionalProperties": false
71 }
72 },
73 "relationships": {
74 "type": "array",
75 "items": {
76 "type": "object",
77 "properties": {
78 "source": { "type": "string" },
79 "target": { "type": "string" },
80 "relation": {
81 "type": "string",
82 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
83 },
84 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
85 },
86 "required": ["source","target","relation","strength"],
87 "additionalProperties": false
88 }
89 }
90 },
91 "required": ["entities","relationships"],
92 "additionalProperties": false
93}"#;
94
95const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
96 "type": "object",
97 "properties": {
98 "description": { "type": "string" }
99 },
100 "required": ["description"],
101 "additionalProperties": false
102}"#;
103
104const BODY_ENRICH_SCHEMA: &str = r#"{
105 "type": "object",
106 "properties": {
107 "enriched_body": { "type": "string" }
108 },
109 "required": ["enriched_body"],
110 "additionalProperties": false
111}"#;
112
113const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
115Scale:\n\
116- 0.9 = vital hard dependency (A cannot function without B)\n\
117- 0.7 = important design relationship (A strongly supports/enables B)\n\
118- 0.5 = useful contextual link (A and B share relevant context)\n\
119- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
120Respond with the calibrated weight and brief reasoning.";
121
122const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
123 "type": "object",
124 "properties": {
125 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
126 "reasoning": { "type": "string" }
127 },
128 "required": ["calibrated_weight", "reasoning"],
129 "additionalProperties": false
130}"#;
131
132const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
134Valid canonical relations (pick exactly one):\n\
135- depends-on: A cannot function without B\n\
136- uses: A utilizes B but could substitute it\n\
137- supports: A reinforces or enables B\n\
138- causes: A triggers or produces B\n\
139- fixes: A resolves a problem in B\n\
140- contradicts: A conflicts with or invalidates B\n\
141- applies-to: A is relevant to or scoped within B\n\
142- follows: A comes after B in sequence\n\
143- replaces: A substitutes B\n\
144- tracked-in: A is monitored in B\n\
145- related: A and B share context (use sparingly)\n\n\
146Respond with the correct relation, strength, and reasoning.";
147
148const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
149 "type": "object",
150 "properties": {
151 "relation": { "type": "string" },
152 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
153 "reasoning": { "type": "string" }
154 },
155 "required": ["relation", "strength", "reasoning"],
156 "additionalProperties": false
157}"#;
158
159const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
161Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
162If NO meaningful relationship exists, set relation to \"none\".\n\
163Respond with the relation (or \"none\"), strength, and reasoning.";
164
165const ENTITY_CONNECT_SCHEMA: &str = r#"{
166 "type": "object",
167 "properties": {
168 "relation": { "type": "string" },
169 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
170 "reasoning": { "type": "string" }
171 },
172 "required": ["relation", "strength", "reasoning"],
173 "additionalProperties": false
174}"#;
175
176const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
178Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
179If the current type is correct, keep it. If wrong, suggest the correct type.\n\
180Respond with the validated type and reasoning.";
181
182const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
183 "type": "object",
184 "properties": {
185 "validated_type": { "type": "string" },
186 "was_correct": { "type": "boolean" },
187 "reasoning": { "type": "string" }
188 },
189 "required": ["validated_type", "was_correct", "reasoning"],
190 "additionalProperties": false
191}"#;
192
193const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
195BAD: 'ingested from docs/auth.md'\n\
196GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
197Respond with the improved description and reasoning.";
198
199const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
200 "type": "object",
201 "properties": {
202 "description": { "type": "string" },
203 "reasoning": { "type": "string" }
204 },
205 "required": ["description", "reasoning"],
206 "additionalProperties": false
207}"#;
208
209const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
211Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
212
213const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
214 "type": "object",
215 "properties": {
216 "domain": { "type": "string" },
217 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
218 "reasoning": { "type": "string" }
219 },
220 "required": ["domain", "confidence", "reasoning"],
221 "additionalProperties": false
222}"#;
223
224const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
226Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
227Respond with a list of issues found (or empty if none) and an overall quality score.";
228
229const GRAPH_AUDIT_SCHEMA: &str = r#"{
230 "type": "object",
231 "properties": {
232 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
233 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
234 "reasoning": { "type": "string" }
235 },
236 "required": ["quality_score", "issues", "reasoning"],
237 "additionalProperties": false
238}"#;
239
240const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
242Entity names: lowercase kebab-case, domain-specific.\n\
243Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
244Respond with extracted entities, relationships, and a synthesis summary.";
245
246const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
247 "type": "object",
248 "properties": {
249 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
250 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
251 "summary": { "type": "string" }
252 },
253 "required": ["entities", "relationships", "summary"],
254 "additionalProperties": false
255}"#;
256
257const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
259Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
260Respond with the restructured body and a brief summary of changes.";
261
262const BODY_EXTRACT_SCHEMA: &str = r#"{
263 "type": "object",
264 "properties": {
265 "restructured_body": { "type": "string" },
266 "changes_summary": { "type": "string" }
267 },
268 "required": ["restructured_body", "changes_summary"],
269 "additionalProperties": false
270}"#;
271
272const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2771. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2782. Typed relationships between entities with strength scores\n\n\
279Rules:\n\
280- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
281- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
282- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
283- NEVER use 'mentions' as relationship type\n\
284- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
285- Prefer fewer high-quality entities over many low-quality ones";
286
287const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
288
289const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
290
291#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
297#[serde(rename_all = "kebab-case")]
298pub enum EnrichOperation {
299 MemoryBindings,
301 EntityDescriptions,
303 BodyEnrich,
305 ReEmbed,
307 WeightCalibrate,
309 RelationReclassify,
311 EntityConnect,
313 EntityTypeValidate,
315 DescriptionEnrich,
317 CrossDomainBridges,
319 DomainClassify,
321 GraphAudit,
323 DeepResearchSynth,
325 BodyExtract,
327}
328
329#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
331pub enum EnrichMode {
332 ClaudeCode,
334 Codex,
336 #[value(name = "opencode")]
338 Opencode,
339}
340
341impl std::fmt::Display for EnrichMode {
342 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343 match self {
344 EnrichMode::ClaudeCode => write!(f, "claude-code"),
345 EnrichMode::Codex => write!(f, "codex"),
346 EnrichMode::Opencode => write!(f, "opencode"),
347 }
348 }
349}
350
351#[derive(clap::Args)]
353#[command(
354 about = "Enrich graph memories and entities using an LLM provider",
355 after_long_help = "EXAMPLES:\n \
356 # Add missing entity bindings to all unbound memories\n \
357 sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n \
358 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
359 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
360 # Expand short memory bodies (GAP-18)\n \
361 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
362 # Rebuild only missing memory embeddings without rewriting bodies\n \
363 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
364 # Resume an interrupted body-enrich run\n \
365 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
366 # Retry only failed items from a previous run\n \
367 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
368 EXIT CODES:\n \
369 0 success\n \
370 1 validation error (bad args, binary not found)\n \
371 14 I/O error"
372)]
373pub struct EnrichArgs {
374 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
376 pub operation: EnrichOperation,
377
378 #[arg(long, value_enum, default_value = "claude-code")]
380 pub mode: EnrichMode,
381
382 #[arg(long, value_name = "N")]
384 pub limit: Option<usize>,
385
386 #[arg(long)]
388 pub dry_run: bool,
389
390 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
392 pub namespace: Option<String>,
393
394 #[arg(long, value_name = "PATH")]
397 pub claude_binary: Option<PathBuf>,
398
399 #[arg(long, value_name = "MODEL")]
401 pub claude_model: Option<String>,
402
403 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
405 pub claude_timeout: u64,
406
407 #[arg(long, value_name = "PATH")]
410 pub codex_binary: Option<PathBuf>,
411
412 #[arg(long, value_name = "MODEL")]
414 pub codex_model: Option<String>,
415
416 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
418 pub codex_timeout: u64,
419
420 #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
423 pub opencode_binary: Option<PathBuf>,
424
425 #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
427 pub opencode_model: Option<String>,
428
429 #[arg(
431 long,
432 value_name = "SECONDS",
433 env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
434 default_value_t = 300
435 )]
436 pub opencode_timeout: u64,
437
438 #[arg(long, value_name = "USD")]
441 pub max_cost_usd: Option<f64>,
442
443 #[arg(long)]
446 pub resume: bool,
447
448 #[arg(long)]
450 pub retry_failed: bool,
451
452 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
455 pub min_output_chars: usize,
456
457 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
459 pub max_output_chars: usize,
460
461 #[arg(long, default_value_t = true)]
463 pub preserve_check: bool,
464
465 #[arg(long, value_name = "PATH")]
467 pub prompt_template: Option<PathBuf>,
468
469 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
473 pub llm_parallelism: u32,
474
475 #[arg(long)]
478 pub json: bool,
479
480 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
482 pub db: Option<String>,
483
484 #[arg(long, value_name = "SECONDS")]
487 pub wait_job_singleton: Option<u64>,
488
489 #[arg(long, default_value_t = false)]
493 pub force_job_singleton: bool,
494
495 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
499 pub names: Vec<String>,
500
501 #[arg(long, value_name = "PATH")]
505 pub names_file: Option<PathBuf>,
506
507 #[arg(long, default_value_t = false)]
511 pub preflight_check: bool,
512
513 #[arg(long, value_enum)]
517 pub fallback_mode: Option<EnrichMode>,
518
519 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
522 pub rate_limit_buffer: u64,
523
524 #[arg(long, default_value_t = true)]
528 pub max_load_check: bool,
529
530 #[arg(long, value_name = "N", default_value_t = 5)]
533 pub circuit_breaker_threshold: u32,
534
535 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
542 pub preserve_threshold: f64,
543
544 #[arg(long, default_value_t = true)]
549 pub codex_model_validate: bool,
550
551 #[arg(long, value_name = "MODEL")]
556 pub codex_model_fallback: Option<String>,
557}
558
559#[derive(Debug, Serialize)]
568struct PhaseEvent<'a> {
569 phase: &'a str,
570 #[serde(skip_serializing_if = "Option::is_none")]
571 binary_path: Option<&'a str>,
572 #[serde(skip_serializing_if = "Option::is_none")]
573 version: Option<&'a str>,
574 #[serde(skip_serializing_if = "Option::is_none")]
575 items_total: Option<usize>,
576 #[serde(skip_serializing_if = "Option::is_none")]
577 items_pending: Option<usize>,
578 #[serde(skip_serializing_if = "Option::is_none")]
580 llm_parallelism: Option<u32>,
581}
582
583#[derive(Debug, Serialize)]
584struct ItemEvent<'a> {
585 item: &'a str,
587 status: &'a str,
588 #[serde(skip_serializing_if = "Option::is_none")]
589 memory_id: Option<i64>,
590 #[serde(skip_serializing_if = "Option::is_none")]
591 entity_id: Option<i64>,
592 #[serde(skip_serializing_if = "Option::is_none")]
593 entities: Option<usize>,
594 #[serde(skip_serializing_if = "Option::is_none")]
595 rels: Option<usize>,
596 #[serde(skip_serializing_if = "Option::is_none")]
597 chars_before: Option<usize>,
598 #[serde(skip_serializing_if = "Option::is_none")]
599 chars_after: Option<usize>,
600 #[serde(skip_serializing_if = "Option::is_none")]
601 cost_usd: Option<f64>,
602 #[serde(skip_serializing_if = "Option::is_none")]
603 elapsed_ms: Option<u64>,
604 #[serde(skip_serializing_if = "Option::is_none")]
605 error: Option<String>,
606 index: usize,
607 total: usize,
608}
609
610#[derive(Debug, Serialize)]
611struct EnrichSummary {
612 summary: bool,
613 operation: String,
614 items_total: usize,
615 completed: usize,
616 failed: usize,
617 skipped: usize,
618 cost_usd: f64,
619 elapsed_ms: u64,
620 #[serde(skip_serializing_if = "Option::is_none")]
625 backend_invoked: Option<&'static str>,
626}
627
628use crate::output::emit_json_line as emit_json;
629
630fn open_queue_db(path: &str) -> Result<Connection, AppError> {
645 let conn = Connection::open(path)?;
646 conn.pragma_update(None, "journal_mode", "wal")?;
647 conn.execute_batch(
648 "CREATE TABLE IF NOT EXISTS queue (
649 id INTEGER PRIMARY KEY AUTOINCREMENT,
650 item_key TEXT NOT NULL UNIQUE,
651 item_type TEXT NOT NULL DEFAULT 'memory',
652 status TEXT NOT NULL DEFAULT 'pending',
653 memory_id INTEGER,
654 entity_id INTEGER,
655 entities INTEGER DEFAULT 0,
656 rels INTEGER DEFAULT 0,
657 error TEXT,
658 cost_usd REAL DEFAULT 0.0,
659 attempt INTEGER DEFAULT 0,
660 elapsed_ms INTEGER,
661 created_at TEXT DEFAULT (datetime('now')),
662 done_at TEXT
663 );
664 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
665 )?;
666 Ok(conn)
667}
668
669fn call_claude(
677 binary: &Path,
678 prompt: &str,
679 json_schema: &str,
680 input_text: &str,
681 model: Option<&str>,
682 timeout_secs: u64,
683) -> Result<(serde_json::Value, f64, bool), AppError> {
684 let result = crate::commands::claude_runner::run_claude(
685 binary,
686 prompt,
687 json_schema,
688 input_text,
689 model,
690 timeout_secs,
691 7,
692 )?;
693 Ok((result.value, result.cost_usd, result.is_oauth))
694}
695
696enum PreflightOutcome {
702 Healthy,
704 RateLimited {
708 reason: String,
709 suggestion: &'static str,
710 },
711 Error(AppError),
713}
714
715fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
723 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
724
725 match args.mode {
726 EnrichMode::ClaudeCode => {
727 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
728 Ok(b) => b,
729 Err(e) => return PreflightOutcome::Error(e),
730 };
731 let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
736 Ok(p) => p,
737 Err(e) => {
738 return PreflightOutcome::Error(AppError::Io(e));
739 }
740 };
741 let mut cmd = std::process::Command::new(&bin);
742 crate::spawn::env_whitelist::apply_env_whitelist(
743 &mut cmd,
744 crate::spawn::env_whitelist::is_strict_env_clear(),
745 );
746 if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
747 return PreflightOutcome::Error(e);
748 }
749 cmd.arg("-p")
750 .arg("ping")
751 .arg("--max-turns")
752 .arg("1")
753 .arg("--strict-mcp-config")
754 .arg("--mcp-config")
755 .arg(mcp_config_path.as_os_str())
756 .arg("--dangerously-skip-permissions")
757 .arg("--settings")
758 .arg("{\"hooks\":{}}")
759 .arg("--output-format")
760 .arg("json")
761 .stdin(std::process::Stdio::null())
762 .stdout(std::process::Stdio::piped())
763 .stderr(std::process::Stdio::piped());
764
765 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
766 Ok(c) => c,
767 Err(e) => {
768 return PreflightOutcome::Error(AppError::Io(e));
769 }
770 };
771 let output = match wait_with_timeout(child, timeout) {
772 Ok(out) => out,
773 Err(e) => return PreflightOutcome::Error(e),
774 };
775 if !output.status.success() {
776 let stderr = String::from_utf8_lossy(&output.stderr);
777 if stderr.contains("hit your session limit")
778 || stderr.contains("rate_limit")
779 || stderr.contains("429")
780 {
781 return PreflightOutcome::RateLimited {
782 reason: stderr.trim().to_string(),
783 suggestion:
784 "wait for the OAuth window to reset or use --fallback-mode codex",
785 };
786 }
787 return PreflightOutcome::Error(AppError::Validation(format!(
788 "preflight probe failed: {stderr}",
789 stderr = stderr.trim()
790 )));
791 }
792 PreflightOutcome::Healthy
793 }
794 EnrichMode::Codex => {
795 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
796 Ok(b) => b,
797 Err(e) => return PreflightOutcome::Error(e),
798 };
799 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
800 .map_err(PreflightOutcome::Error)
801 .ok();
802 let schema = "{}";
803 let schema_path = match super::codex_spawn::trusted_schema_path() {
804 Ok(p) => p,
805 Err(e) => return PreflightOutcome::Error(e),
806 };
807 let spawn_args = super::codex_spawn::CodexSpawnArgs {
808 binary: &bin,
809 prompt: "ping",
810 json_schema: schema,
811 input_text: "",
812 model: args.codex_model.as_deref(),
813 timeout_secs: args.rate_limit_buffer.max(60),
814 schema_path: schema_path.clone(),
815 };
816 let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
817 Ok(c) => c,
818 Err(e) => return PreflightOutcome::Error(e),
819 };
820 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
821 Ok(c) => c,
822 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
823 };
824 let output = match wait_with_timeout(child, timeout) {
825 Ok(out) => out,
826 Err(e) => return PreflightOutcome::Error(e),
827 };
828 let _ = std::fs::remove_file(&schema_path);
829 if !output.status.success() {
830 let stderr = String::from_utf8_lossy(&output.stderr);
831 if stderr.contains("rate_limit")
832 || stderr.contains("429")
833 || stderr.contains("Too Many Requests")
834 {
835 return PreflightOutcome::RateLimited {
836 reason: stderr.trim().to_string(),
837 suggestion: "wait for the rate-limit window to reset",
838 };
839 }
840 return PreflightOutcome::Error(AppError::Validation(format!(
841 "preflight probe failed: {stderr}",
842 stderr = stderr.trim()
843 )));
844 }
845 PreflightOutcome::Healthy
846 }
847 EnrichMode::Opencode => {
848 let bin = match super::opencode_runner::find_opencode_binary_with_override(
849 args.opencode_binary.as_deref(),
850 ) {
851 Ok(b) => b,
852 Err(e) => return PreflightOutcome::Error(e),
853 };
854 let model =
855 super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
856 let mut cmd =
857 match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
858 {
859 Ok(c) => c,
860 Err(e) => return PreflightOutcome::Error(e),
861 };
862 let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
863 Ok(c) => c,
864 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
865 };
866 let output = match wait_with_timeout(child, timeout) {
867 Ok(out) => out,
868 Err(e) => return PreflightOutcome::Error(e),
869 };
870 if !output.status.success() {
871 let stderr = String::from_utf8_lossy(&output.stderr);
872 if stderr.contains("rate_limit")
873 || stderr.contains("429")
874 || stderr.contains("Too Many Requests")
875 {
876 return PreflightOutcome::RateLimited {
877 reason: stderr.trim().to_string(),
878 suggestion: "wait for the rate-limit window to reset",
879 };
880 }
881 return PreflightOutcome::Error(AppError::Validation(format!(
882 "preflight probe failed: {stderr}",
883 stderr = stderr.trim()
884 )));
885 }
886 PreflightOutcome::Healthy
887 }
888 }
889}
890
891fn wait_with_timeout(
893 mut child: std::process::Child,
894 timeout: std::time::Duration,
895) -> Result<std::process::Output, AppError> {
896 use wait_timeout::ChildExt;
897 let start = std::time::Instant::now();
898 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
899 if status.is_none() {
900 let _ = child.kill();
901 let _ = child.wait();
902 return Err(AppError::Validation(format!(
903 "preflight probe timed out after {}s",
904 start.elapsed().as_secs()
905 )));
906 }
907 let mut stdout = Vec::new();
908 if let Some(mut out) = child.stdout.take() {
909 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
910 }
911 let mut stderr = Vec::new();
912 if let Some(mut err) = child.stderr.take() {
913 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
914 }
915 let exit = status.unwrap();
916 Ok(std::process::Output {
917 status: exit,
918 stdout,
919 stderr,
920 })
921}
922
923fn scan_unbound_memories(
934 conn: &Connection,
935 namespace: &str,
936 limit: Option<usize>,
937 name_filter: &[String],
938) -> Result<Vec<(i64, String, String)>, AppError> {
939 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
940
941 if name_filter.is_empty() {
942 let sql = format!(
943 "SELECT m.id, m.name, m.body
944 FROM memories m
945 WHERE m.namespace = ?1
946 AND m.deleted_at IS NULL
947 AND NOT EXISTS (
948 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
949 )
950 ORDER BY m.id
951 {limit_clause}"
952 );
953 let mut stmt = conn.prepare(&sql)?;
954 let rows = stmt
955 .query_map(rusqlite::params![namespace], |r| {
956 Ok((
957 r.get::<_, i64>(0)?,
958 r.get::<_, String>(1)?,
959 r.get::<_, String>(2)?,
960 ))
961 })?
962 .collect::<Result<Vec<_>, _>>()?;
963 Ok(rows)
964 } else {
965 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
967 .map(|i| format!("?{i}"))
968 .collect();
969 let in_clause = placeholders.join(", ");
970 let sql = format!(
971 "SELECT m.id, m.name, m.body
972 FROM memories m
973 WHERE m.namespace = ?1
974 AND m.deleted_at IS NULL
975 AND m.name IN ({in_clause})
976 AND NOT EXISTS (
977 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
978 )
979 ORDER BY m.id
980 {limit_clause}"
981 );
982 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
983 params_vec.push(&namespace);
984 for n in name_filter {
985 params_vec.push(n);
986 }
987 let mut stmt = conn.prepare(&sql)?;
988 let rows = stmt
989 .query_map(
990 rusqlite::params_from_iter(params_vec.iter().copied()),
991 |r| {
992 Ok((
993 r.get::<_, i64>(0)?,
994 r.get::<_, String>(1)?,
995 r.get::<_, String>(2)?,
996 ))
997 },
998 )?
999 .collect::<Result<Vec<_>, _>>()?;
1000 Ok(rows)
1001 }
1002}
1003
1004fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
1009 let content = std::fs::read_to_string(path).map_err(|e| {
1010 AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
1011 })?;
1012 let mut seen = std::collections::HashSet::new();
1013 let mut out = Vec::new();
1014 for line in content.lines() {
1015 let trimmed = line.trim();
1016 if trimmed.is_empty() || trimmed.starts_with('#') {
1017 continue;
1018 }
1019 if seen.insert(trimmed.to_string()) {
1020 out.push(trimmed.to_string());
1021 }
1022 }
1023 Ok(out)
1024}
1025
1026fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
1028 let mut combined: Vec<String> = args.names.clone();
1029 if let Some(p) = &args.names_file {
1030 let from_file = read_names_file(p)?;
1031 for n in from_file {
1032 if !combined.contains(&n) {
1033 combined.push(n);
1034 }
1035 }
1036 }
1037 Ok(combined)
1038}
1039
1040fn scan_entities_without_description(
1044 conn: &Connection,
1045 namespace: &str,
1046 limit: Option<usize>,
1047 name_filter: &[String],
1048) -> Result<Vec<(i64, String, String)>, AppError> {
1049 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1050
1051 if name_filter.is_empty() {
1052 let sql = format!(
1053 "SELECT id, name, type
1054 FROM entities
1055 WHERE namespace = ?1
1056 AND (description IS NULL OR description = '')
1057 ORDER BY id
1058 {limit_clause}"
1059 );
1060 let mut stmt = conn.prepare(&sql)?;
1061 let rows = stmt
1062 .query_map(rusqlite::params![namespace], |r| {
1063 Ok((
1064 r.get::<_, i64>(0)?,
1065 r.get::<_, String>(1)?,
1066 r.get::<_, String>(2)?,
1067 ))
1068 })?
1069 .collect::<Result<Vec<_>, _>>()?;
1070 Ok(rows)
1071 } else {
1072 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1073 .map(|i| format!("?{i}"))
1074 .collect();
1075 let in_clause = placeholders.join(", ");
1076 let sql = format!(
1077 "SELECT id, name, type
1078 FROM entities
1079 WHERE namespace = ?1
1080 AND name IN ({in_clause})
1081 AND (description IS NULL OR description = '')
1082 ORDER BY id
1083 {limit_clause}"
1084 );
1085 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1086 params_vec.push(&namespace);
1087 for n in name_filter {
1088 params_vec.push(n);
1089 }
1090 let mut stmt = conn.prepare(&sql)?;
1091 let rows = stmt
1092 .query_map(
1093 rusqlite::params_from_iter(params_vec.iter().copied()),
1094 |r| {
1095 Ok((
1096 r.get::<_, i64>(0)?,
1097 r.get::<_, String>(1)?,
1098 r.get::<_, String>(2)?,
1099 ))
1100 },
1101 )?
1102 .collect::<Result<Vec<_>, _>>()?;
1103 Ok(rows)
1104 }
1105}
1106
1107fn scan_short_body_memories(
1111 conn: &Connection,
1112 namespace: &str,
1113 min_chars: usize,
1114 limit: Option<usize>,
1115 name_filter: &[String],
1116) -> Result<Vec<(i64, String, String)>, AppError> {
1117 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1118
1119 if name_filter.is_empty() {
1120 let sql = format!(
1121 "SELECT m.id, m.name, m.body
1122 FROM memories m
1123 WHERE m.namespace = ?1
1124 AND m.deleted_at IS NULL
1125 AND LENGTH(COALESCE(m.body,'')) < ?2
1126 ORDER BY m.id
1127 {limit_clause}"
1128 );
1129 let mut stmt = conn.prepare(&sql)?;
1130 let rows = stmt
1131 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1132 Ok((
1133 r.get::<_, i64>(0)?,
1134 r.get::<_, String>(1)?,
1135 r.get::<_, String>(2)?,
1136 ))
1137 })?
1138 .collect::<Result<Vec<_>, _>>()?;
1139 Ok(rows)
1140 } else {
1141 let placeholders: Vec<String> = (3..=name_filter.len() + 2)
1142 .map(|i| format!("?{i}"))
1143 .collect();
1144 let in_clause = placeholders.join(", ");
1145 let sql = format!(
1146 "SELECT m.id, m.name, m.body
1147 FROM memories m
1148 WHERE m.namespace = ?1
1149 AND m.deleted_at IS NULL
1150 AND m.name IN ({in_clause})
1151 AND LENGTH(COALESCE(m.body,'')) < ?2
1152 ORDER BY m.id
1153 {limit_clause}"
1154 );
1155 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(2 + name_filter.len());
1156 let min_chars_i64 = min_chars as i64;
1157 params_vec.push(&namespace);
1158 params_vec.push(&min_chars_i64);
1159 for n in name_filter {
1160 params_vec.push(n);
1161 }
1162 let mut stmt = conn.prepare(&sql)?;
1163 let rows = stmt
1164 .query_map(
1165 rusqlite::params_from_iter(params_vec.iter().copied()),
1166 |r| {
1167 Ok((
1168 r.get::<_, i64>(0)?,
1169 r.get::<_, String>(1)?,
1170 r.get::<_, String>(2)?,
1171 ))
1172 },
1173 )?
1174 .collect::<Result<Vec<_>, _>>()?;
1175 Ok(rows)
1176 }
1177}
1178
1179fn scan_memories_without_embeddings(
1183 conn: &Connection,
1184 namespace: &str,
1185 limit: Option<usize>,
1186 name_filter: &[String],
1187) -> Result<Vec<(i64, String, String)>, AppError> {
1188 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1189
1190 if name_filter.is_empty() {
1191 let sql = format!(
1192 "SELECT m.id, m.name, COALESCE(m.body,'')
1193 FROM memories m
1194 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1195 WHERE m.namespace = ?1
1196 AND m.deleted_at IS NULL
1197 AND me.memory_id IS NULL
1198 ORDER BY m.id
1199 {limit_clause}"
1200 );
1201 let mut stmt = conn.prepare(&sql)?;
1202 let rows = stmt
1203 .query_map(rusqlite::params![namespace], |r| {
1204 Ok((
1205 r.get::<_, i64>(0)?,
1206 r.get::<_, String>(1)?,
1207 r.get::<_, String>(2)?,
1208 ))
1209 })?
1210 .collect::<Result<Vec<_>, _>>()?;
1211 Ok(rows)
1212 } else {
1213 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1214 .map(|i| format!("?{i}"))
1215 .collect();
1216 let in_clause = placeholders.join(", ");
1217 let sql = format!(
1218 "SELECT m.id, m.name, COALESCE(m.body,'')
1219 FROM memories m
1220 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1221 WHERE m.namespace = ?1
1222 AND m.deleted_at IS NULL
1223 AND m.name IN ({in_clause})
1224 AND me.memory_id IS NULL
1225 ORDER BY m.id
1226 {limit_clause}"
1227 );
1228 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1229 params_vec.push(&namespace);
1230 for n in name_filter {
1231 params_vec.push(n);
1232 }
1233 let mut stmt = conn.prepare(&sql)?;
1234 let rows = stmt
1235 .query_map(
1236 rusqlite::params_from_iter(params_vec.iter().copied()),
1237 |r| {
1238 Ok((
1239 r.get::<_, i64>(0)?,
1240 r.get::<_, String>(1)?,
1241 r.get::<_, String>(2)?,
1242 ))
1243 },
1244 )?
1245 .collect::<Result<Vec<_>, _>>()?;
1246 Ok(rows)
1247 }
1248}
1249
1250#[allow(clippy::type_complexity)]
1252fn scan_weight_candidates(
1253 conn: &Connection,
1254 namespace: &str,
1255 limit: Option<usize>,
1256) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1257 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1258 let sql = format!(
1259 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1260 FROM relationships r \
1261 JOIN entities e1 ON e1.id = r.source_id \
1262 JOIN entities e2 ON e2.id = r.target_id \
1263 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1264 ORDER BY r.weight DESC {limit_clause}"
1265 );
1266 let mut stmt = conn.prepare(&sql)?;
1267 let rows = stmt
1268 .query_map(rusqlite::params![namespace], |r| {
1269 Ok((
1270 r.get::<_, i64>(0)?,
1271 r.get::<_, String>(1)?,
1272 r.get::<_, String>(2)?,
1273 r.get::<_, String>(3)?,
1274 r.get::<_, f64>(4)?,
1275 ))
1276 })?
1277 .collect::<Result<Vec<_>, _>>()?;
1278 Ok(rows)
1279}
1280
1281fn scan_generic_relations(
1283 conn: &Connection,
1284 namespace: &str,
1285 limit: Option<usize>,
1286) -> Result<Vec<(i64, String, String, String)>, AppError> {
1287 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1288 let sql = format!(
1289 "SELECT r.id, e1.name, e2.name, r.relation \
1290 FROM relationships r \
1291 JOIN entities e1 ON e1.id = r.source_id \
1292 JOIN entities e2 ON e2.id = r.target_id \
1293 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1294 ORDER BY r.id {limit_clause}"
1295 );
1296 let mut stmt = conn.prepare(&sql)?;
1297 let rows = stmt
1298 .query_map(rusqlite::params![namespace], |r| {
1299 Ok((
1300 r.get::<_, i64>(0)?,
1301 r.get::<_, String>(1)?,
1302 r.get::<_, String>(2)?,
1303 r.get::<_, String>(3)?,
1304 ))
1305 })?
1306 .collect::<Result<Vec<_>, _>>()?;
1307 Ok(rows)
1308}
1309
1310fn persist_memory_bindings(
1319 conn: &Connection,
1320 namespace: &str,
1321 memory_id: i64,
1322 entities_json: &serde_json::Value,
1323 rels_json: &serde_json::Value,
1324) -> Result<(usize, usize), AppError> {
1325 #[derive(Deserialize)]
1326 struct EntityItem {
1327 name: String,
1328 entity_type: String,
1329 }
1330 #[derive(Deserialize)]
1331 struct RelItem {
1332 source: String,
1333 target: String,
1334 relation: String,
1335 strength: f64,
1336 }
1337
1338 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1339 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1340
1341 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1342 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1343
1344 let mut ent_count = 0usize;
1345 let mut rel_count = 0usize;
1346
1347 for item in &extracted_entities {
1348 let entity_type = match item.entity_type.parse::<EntityType>() {
1349 Ok(et) => et,
1350 Err(_) => {
1351 tracing::warn!(
1352 target: "enrich",
1353 entity = %item.name,
1354 entity_type = %item.entity_type,
1355 "entity type not recognized, skipping"
1356 );
1357 continue;
1358 }
1359 };
1360 match entities::upsert_entity(
1361 conn,
1362 namespace,
1363 &NewEntity {
1364 name: item.name.clone(),
1365 entity_type,
1366 description: None,
1367 },
1368 ) {
1369 Ok(eid) => {
1370 let _ = entities::link_memory_entity(conn, memory_id, eid);
1371 ent_count += 1;
1372 }
1373 Err(e) => {
1374 tracing::warn!(
1375 target: "enrich",
1376 entity = %item.name,
1377 error = %e,
1378 "entity upsert skipped"
1379 );
1380 }
1381 }
1382 }
1383
1384 for rel in &extracted_rels {
1385 let normalized = crate::parsers::normalize_relation(&rel.relation);
1386 crate::parsers::warn_if_non_canonical(&normalized);
1387
1388 let src_name = crate::parsers::normalize_entity_name(&rel.source);
1391 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1392 let src_id = entities::find_entity_id(conn, namespace, &src_name);
1393 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1394 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1395 let new_rel = NewRelationship {
1396 source: rel.source.clone(),
1397 target: rel.target.clone(),
1398 relation: normalized,
1399 strength: rel.strength,
1400 description: None,
1401 };
1402 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1403 rel_count += 1;
1404 }
1405 }
1406 }
1407
1408 Ok((ent_count, rel_count))
1409}
1410
1411fn persist_entity_description(
1413 conn: &Connection,
1414 entity_id: i64,
1415 description: &str,
1416) -> Result<(), AppError> {
1417 conn.execute(
1418 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1419 rusqlite::params![description, entity_id],
1420 )?;
1421 Ok(())
1422}
1423
1424#[allow(clippy::too_many_arguments)]
1430fn reembed_memory_vector(
1431 conn: &Connection,
1432 namespace: &str,
1433 memory_id: i64,
1434 memory_name: &str,
1435 memory_type: &str,
1436 body: &str,
1437 paths: &crate::paths::AppPaths,
1438 llm_backend: crate::cli::LlmBackendChoice,
1439 embedding_backend: crate::cli::EmbeddingBackendChoice,
1440) -> Result<(), AppError> {
1441 let snippet: String = body.chars().take(200).collect();
1442 let (embedding, backend_kind) = crate::embedder::embed_passage_with_embedding_choice(
1448 &paths.models,
1449 body,
1450 embedding_backend,
1451 llm_backend,
1452 )?;
1453 record_enrich_backend(backend_kind.as_str());
1454 memories::upsert_vec(
1455 conn,
1456 memory_id,
1457 namespace,
1458 memory_type,
1459 &embedding,
1460 memory_name,
1461 &snippet,
1462 )?;
1463 Ok(())
1464}
1465
1466fn record_enrich_backend(backend: &'static str) {
1472 if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1473 *guard = Some(backend);
1474 }
1475}
1476
1477fn take_enrich_backend() -> Option<&'static str> {
1478 ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1479}
1480
1481static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1482
1483#[allow(clippy::too_many_arguments)]
1488fn persist_enriched_body(
1489 conn: &Connection,
1490 namespace: &str,
1491 memory_id: i64,
1492 memory_name: &str,
1493 new_body: &str,
1494 paths: &crate::paths::AppPaths,
1495 llm_backend: crate::cli::LlmBackendChoice,
1496 embedding_backend: crate::cli::EmbeddingBackendChoice,
1497) -> Result<(), AppError> {
1498 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1500 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1501 rusqlite::params![memory_id],
1502 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1503 )?;
1504
1505 let memory_type: String = conn.query_row(
1506 "SELECT type FROM memories WHERE id=?1",
1507 rusqlite::params![memory_id],
1508 |r| r.get(0),
1509 )?;
1510
1511 let description: String = conn.query_row(
1512 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1513 rusqlite::params![memory_id],
1514 |r| r.get(0),
1515 )?;
1516
1517 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1518
1519 let new_memory = memories::NewMemory {
1520 namespace: namespace.to_string(),
1521 name: memory_name.to_string(),
1522 memory_type: memory_type.clone(),
1523 description: description.clone(),
1524 body: new_body.to_string(),
1525 body_hash,
1526 session_id: None,
1527 source: "agent".to_string(),
1528 metadata: serde_json::json!({
1529 "operation": "body-enrich",
1530 "orig_chars": old_body.chars().count(),
1531 "new_chars": new_body.chars().count(),
1532 }),
1533 };
1534
1535 let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1539 let version_metadata = serde_json::json!({
1540 "operation": "body-enrich",
1541 "orig_chars": old_body.chars().count(),
1542 "new_chars": new_body.chars().count(),
1543 })
1544 .to_string();
1545 crate::storage::versions::insert_version(
1546 conn,
1547 memory_id,
1548 next_version,
1549 memory_name,
1550 &memory_type,
1551 &description,
1552 new_body,
1553 &version_metadata,
1554 Some("enrich"),
1555 "edit",
1556 )?;
1557
1558 memories::update(conn, memory_id, &new_memory, None)?;
1559 memories::sync_fts_after_update(
1560 conn,
1561 memory_id,
1562 &old_name,
1563 &old_desc,
1564 &old_body,
1565 &new_memory.name,
1566 &new_memory.description,
1567 &new_memory.body,
1568 )?;
1569
1570 if let Err(e) = reembed_memory_vector(
1572 conn,
1573 namespace,
1574 memory_id,
1575 memory_name,
1576 &memory_type,
1577 new_body,
1578 paths,
1579 llm_backend,
1580 embedding_backend,
1581 ) {
1582 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1583 }
1584
1585 Ok(())
1586}
1587
1588fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1600 value == default
1601}
1602
1603fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1618 const DEFAULT_TIMEOUT: u64 = 300;
1619
1620 let mut conflicts: Vec<String> = Vec::new();
1621
1622 match args.mode {
1623 EnrichMode::ClaudeCode => {
1624 if args.codex_binary.is_some() {
1625 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1626 }
1627 if args.codex_model.is_some() {
1628 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1629 }
1630 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1631 conflicts.push(format!(
1632 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1633 args.codex_timeout
1634 ));
1635 }
1636 }
1637 EnrichMode::Codex => {
1638 if args.claude_binary.is_some() {
1639 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1640 }
1641 if args.claude_model.is_some() {
1642 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1643 }
1644 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1645 conflicts.push(format!(
1646 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1647 args.claude_timeout
1648 ));
1649 }
1650 if args.max_cost_usd.is_some() {
1651 conflicts.push(
1652 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1653 .to_string(),
1654 );
1655 }
1656 }
1657 EnrichMode::Opencode => {
1658 if args.claude_binary.is_some() {
1659 conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1660 }
1661 if args.claude_model.is_some() {
1662 conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1663 }
1664 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1665 conflicts.push(format!(
1666 "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1667 args.claude_timeout
1668 ));
1669 }
1670 if args.max_cost_usd.is_some() {
1671 conflicts.push(
1672 "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1673 .to_string(),
1674 );
1675 }
1676 }
1677 }
1678
1679 if !conflicts.is_empty() {
1680 return Err(AppError::Validation(format!(
1681 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1682 args.mode,
1683 conflicts.join("\n - ")
1684 )));
1685 }
1686
1687 Ok(())
1688}
1689
1690pub fn run(
1694 args: &EnrichArgs,
1695 llm_backend: crate::cli::LlmBackendChoice,
1696 embedding_backend: crate::cli::EmbeddingBackendChoice,
1697) -> Result<(), AppError> {
1698 validate_mode_conditional_flags_enrich(args)?;
1701 let started = Instant::now();
1702
1703 let paths = AppPaths::resolve(args.db.as_deref())?;
1704 ensure_db_ready(&paths)?;
1705 let conn = open_rw(&paths.db)?;
1706 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1707
1708 let wait_secs = args.wait_job_singleton;
1714 let force_flag = args.force_job_singleton;
1715 let _singleton = crate::lock::acquire_job_singleton(
1716 crate::lock::JobType::Enrich,
1717 &namespace,
1718 &paths.db,
1719 wait_secs,
1720 force_flag,
1721 )?;
1722
1723 let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
1725 None
1726 } else {
1727 Some(match args.mode {
1728 EnrichMode::ClaudeCode => {
1729 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1730 let version = super::claude_runner::validate_claude_version(&bin)?;
1731 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1732 emit_json(&PhaseEvent {
1733 phase: "validate",
1734 binary_path: bin.to_str(),
1735 version: Some(&version),
1736 items_total: None,
1737 items_pending: None,
1738 llm_parallelism: None,
1739 });
1740 bin
1741 }
1742 EnrichMode::Codex => {
1743 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1744 emit_json(&PhaseEvent {
1745 phase: "validate",
1746 binary_path: bin.to_str(),
1747 version: None,
1748 items_total: None,
1749 items_pending: None,
1750 llm_parallelism: None,
1751 });
1752 bin
1753 }
1754 EnrichMode::Opencode => {
1755 let bin = super::opencode_runner::find_opencode_binary_with_override(
1756 args.opencode_binary.as_deref(),
1757 )?;
1758 emit_json(&PhaseEvent {
1759 phase: "validate",
1760 binary_path: bin.to_str(),
1761 version: None,
1762 items_total: None,
1763 items_pending: None,
1764 llm_parallelism: None,
1765 });
1766 bin
1767 }
1768 })
1769 };
1770
1771 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1775 let load = crate::system_load::load_average_one();
1776 let n = crate::system_load::ncpus();
1777 return Err(AppError::Validation(format!(
1778 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1779 pass --no-max-load-check to override (not recommended)"
1780 )));
1781 }
1782
1783 if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
1790 {
1791 let preflight_result = run_preflight_probe(args);
1792 match preflight_result {
1793 PreflightOutcome::Healthy => {
1794 tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1795 }
1796 PreflightOutcome::RateLimited { reason, suggestion } => {
1797 if let Some(fallback) = args.fallback_mode.clone() {
1798 if fallback != args.mode {
1799 return Err(AppError::Validation(format!(
1809 "preflight detected rate limit on {mode:?}: {reason}; \
1810 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1811 mode = args.mode
1812 )));
1813 }
1814 return Err(AppError::Validation(format!(
1815 "preflight detected rate limit on {mode:?}: {reason}; \
1816 --fallback-mode matches --mode, no recovery possible",
1817 mode = args.mode
1818 )));
1819 }
1820 return Err(AppError::Validation(format!(
1821 "preflight detected rate limit on {mode:?}: {reason}; \
1822 {suggestion}; pass --fallback-mode codex to recover",
1823 mode = args.mode
1824 )));
1825 }
1826 PreflightOutcome::Error(e) => {
1827 return Err(e);
1828 }
1829 }
1830 }
1831
1832 let scan_result = scan_operation(&conn, &namespace, args)?;
1834 let total = scan_result.len();
1835
1836 emit_json(&PhaseEvent {
1837 phase: "scan",
1838 binary_path: None,
1839 version: None,
1840 items_total: Some(total),
1841 items_pending: Some(total),
1842 llm_parallelism: Some(args.llm_parallelism),
1843 });
1844
1845 if args.dry_run {
1847 for (idx, key) in scan_result.iter().enumerate() {
1848 emit_json(&ItemEvent {
1849 item: key,
1850 status: "preview",
1851 memory_id: None,
1852 entity_id: None,
1853 entities: None,
1854 rels: None,
1855 chars_before: None,
1856 chars_after: None,
1857 cost_usd: None,
1858 elapsed_ms: None,
1859 error: None,
1860 index: idx,
1861 total,
1862 });
1863 }
1864 emit_json(&EnrichSummary {
1865 summary: true,
1866 operation: format!("{:?}", args.operation),
1867 items_total: total,
1868 completed: 0,
1869 failed: 0,
1870 skipped: 0,
1871 cost_usd: 0.0,
1872 elapsed_ms: started.elapsed().as_millis() as u64,
1873 backend_invoked: take_enrich_backend(),
1874 });
1875 return Ok(());
1876 }
1877
1878 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1882
1883 if args.resume {
1884 let reset = queue_conn
1885 .execute(
1886 "UPDATE queue SET status='pending' WHERE status='processing'",
1887 [],
1888 )
1889 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1890 if reset > 0 {
1891 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1892 }
1893 }
1894
1895 if args.retry_failed {
1896 let count = queue_conn
1897 .execute(
1898 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1899 [],
1900 )
1901 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1902 tracing::info!(target: "enrich", count, "retrying failed items");
1903 }
1904
1905 if !args.resume && !args.retry_failed {
1906 queue_conn
1907 .execute("DELETE FROM queue", [])
1908 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1909 }
1910
1911 for (idx, key) in scan_result.iter().enumerate() {
1913 let item_type = match args.operation {
1914 EnrichOperation::EntityDescriptions => "entity",
1915 _ => "memory",
1916 };
1917 if let Err(e) = queue_conn.execute(
1918 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1919 rusqlite::params![key, item_type],
1920 ) {
1921 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1922 }
1923 let _ = idx; }
1925
1926 let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1929 if parallelism > 1 {
1930 tracing::info!(
1931 target: "enrich",
1932 llm_parallelism = parallelism,
1933 "parallel LLM processing with bounded thread pool"
1934 );
1935 }
1936 if parallelism > 4 {
1940 match args.mode {
1941 EnrichMode::ClaudeCode => {
1942 tracing::warn!(
1943 target: "enrich",
1944 llm_parallelism = parallelism,
1945 recommended_max = 4,
1946 mode = "claude-code",
1947 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1948 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1949 to cut MCP children (G28-A)"
1950 );
1951 }
1952 EnrichMode::Codex if parallelism > 16 => {
1953 tracing::warn!(
1954 target: "enrich",
1955 llm_parallelism = parallelism,
1956 recommended_max = 16,
1957 mode = "codex",
1958 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1959 consider --llm-parallelism 8 for safer concurrency"
1960 );
1961 }
1962 EnrichMode::Codex => {
1963 }
1967 EnrichMode::Opencode if parallelism > 16 => {
1968 tracing::warn!(
1969 target: "enrich",
1970 llm_parallelism = parallelism,
1971 recommended_max = 16,
1972 mode = "opencode",
1973 "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
1974 consider --llm-parallelism 8 for safer concurrency"
1975 );
1976 }
1977 EnrichMode::Opencode => {
1978 }
1980 }
1981 }
1982
1983 let mut completed = 0usize;
1984 let mut failed = 0usize;
1985 let mut skipped = 0usize;
1986 let mut cost_total = 0.0f64;
1987 let mut oauth_detected = false;
1988 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1989 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1990 let enrich_started = std::time::Instant::now();
1991
1992 let provider_timeout = match args.mode {
1993 EnrichMode::ClaudeCode => args.claude_timeout,
1994 EnrichMode::Codex => args.codex_timeout,
1995 EnrichMode::Opencode => args.opencode_timeout,
1996 };
1997
1998 let provider_model: Option<&str> = match args.mode {
1999 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
2000 EnrichMode::Codex => args.codex_model.as_deref(),
2001 EnrichMode::Opencode => args.opencode_model.as_deref(),
2002 };
2003
2004 if parallelism > 1 {
2008 let stdout_mu = parking_lot::Mutex::new(());
2009 let budget = args.max_cost_usd;
2010 let operation = args.operation.clone();
2011 let mode = args.mode.clone();
2012 let min_oc = args.min_output_chars;
2013 let max_oc = args.max_output_chars;
2014 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
2015
2016 struct WorkerResult {
2017 completed: usize,
2018 failed: usize,
2019 skipped: usize,
2020 cost: f64,
2021 oauth: bool,
2022 }
2023
2024 let results: Vec<WorkerResult> = std::thread::scope(|s| {
2025 let handles: Vec<_> = (0..parallelism)
2026 .map(|worker_id| {
2027 let stdout_mu = &stdout_mu;
2028 let paths = &paths;
2029 let namespace = &namespace;
2030 let provider_binary = provider_binary.as_deref();
2031 let operation = &operation;
2032 let mode = &mode;
2033 let prompt_tpl = prompt_tpl.as_deref();
2034 s.spawn(move || {
2035 let w_conn = match open_rw(&paths.db) {
2036 Ok(c) => c,
2037 Err(e) => {
2038 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
2039 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2040 }
2041 };
2042 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
2043 Ok(c) => c,
2044 Err(e) => {
2045 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
2046 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2047 }
2048 };
2049 let mut w_completed = 0usize;
2050 let mut w_failed = 0usize;
2051 let mut w_skipped = 0usize;
2052 let mut w_cost = 0.0f64;
2053 let mut w_oauth = false;
2054 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2055 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2056 let mut w_breaker = crate::retry::CircuitBreaker::new(
2062 args.circuit_breaker_threshold.max(1),
2063 std::time::Duration::from_secs(60),
2064 );
2065
2066 loop {
2067 if crate::shutdown_requested() {
2068 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
2069 break;
2070 }
2071 if let Some(b) = budget {
2072 if !w_oauth && w_cost >= b {
2073 break;
2074 }
2075 }
2076 let pending: Option<(i64, String, String)> = w_queue
2077 .query_row(
2078 "UPDATE queue SET status='processing', attempt=attempt+1 \
2079 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2080 RETURNING id, item_key, item_type",
2081 [],
2082 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2083 )
2084 .ok();
2085 let (queue_id, item_key, _item_type) = match pending {
2086 Some(p) => p,
2087 None => break,
2088 };
2089 let item_started = Instant::now();
2090 let current_index = w_completed + w_failed + w_skipped;
2091
2092 let call_result = match operation {
2093 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2094 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2095 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend, embedding_backend),
2096 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend, embedding_backend),
2097 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2098 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2099 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2100 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2101 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2102 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2103 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2104 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2105 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2106 };
2107
2108 match call_result {
2109 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2110 if is_oauth { w_oauth = true; }
2111 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2112 let _ = w_queue.execute(
2113 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2114 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2115 );
2116 w_completed += 1;
2117 if !is_oauth { w_cost += cost; }
2118 let _ = w_breaker
2120 .record(crate::retry::AttemptOutcome::Success);
2121 let _guard = stdout_mu.lock();
2122 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2123 }
2124 Ok(EnrichItemResult::Skipped { reason }) => {
2125 w_skipped += 1;
2126 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2127 let _guard = stdout_mu.lock();
2128 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2129 }
2130 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2131 w_skipped += 1;
2137 let reason = format!(
2138 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2139 );
2140 let _ = w_queue.execute(
2141 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2142 rusqlite::params![reason, queue_id],
2143 );
2144 let _guard = stdout_mu.lock();
2145 emit_json(&ItemEvent {
2146 item: &item_key,
2147 status: "preservation_failed",
2148 memory_id: None,
2149 entity_id: None,
2150 entities: None,
2151 rels: None,
2152 chars_before: Some(chars_before),
2153 chars_after: Some(chars_after),
2154 cost_usd: None,
2155 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2156 error: Some(reason),
2157 index: current_index,
2158 total,
2159 });
2160 }
2161 Err(e) => {
2162 let err_str = format!("{e}");
2163 if matches!(e, AppError::RateLimited { .. }) {
2164 if crate::retry::is_kill_switch_active() {
2165 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2166 } else if std::time::Instant::now() >= w_deadline {
2167 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2168 } else {
2169 let half = w_backoff / 2;
2170 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2171 let actual_wait = half + jitter;
2172 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2173 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2174 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2175 w_backoff = (w_backoff * 2).min(900);
2176 continue;
2177 }
2178 }
2179 w_failed += 1;
2180 let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
2181 let _guard = stdout_mu.lock();
2182 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2183 let breaker_opened = w_breaker
2185 .record(crate::retry::AttemptOutcome::HardFailure);
2186 if breaker_opened {
2187 tracing::error!(target: "enrich",
2188 consecutive_failures = w_breaker.consecutive_failures(),
2189 "circuit breaker opened — aborting worker"
2190 );
2191 break;
2192 }
2193 }
2194 }
2195 }
2196 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
2197 })
2198 })
2199 .collect();
2200 handles
2201 .into_iter()
2202 .map(|h| {
2203 h.join().unwrap_or(WorkerResult {
2204 completed: 0,
2205 failed: 0,
2206 skipped: 0,
2207 cost: 0.0,
2208 oauth: false,
2209 })
2210 })
2211 .collect()
2212 });
2213
2214 for r in &results {
2215 completed += r.completed;
2216 failed += r.failed;
2217 skipped += r.skipped;
2218 cost_total += r.cost;
2219 if r.oauth && !oauth_detected {
2220 oauth_detected = true;
2221 }
2222 }
2223 } else {
2224 loop {
2226 if crate::shutdown_requested() {
2227 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2228 break;
2229 }
2230
2231 if let Some(budget) = args.max_cost_usd {
2233 if !oauth_detected && cost_total >= budget {
2234 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2235 break;
2236 }
2237 }
2238
2239 let pending: Option<(i64, String, String)> = queue_conn
2241 .query_row(
2242 "UPDATE queue SET status='processing', attempt=attempt+1 \
2243 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
2244 RETURNING id, item_key, item_type",
2245 [],
2246 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2247 )
2248 .ok();
2249
2250 let (queue_id, item_key, item_type) = match pending {
2251 Some(p) => p,
2252 None => break,
2253 };
2254
2255 let item_started = Instant::now();
2256 let current_index = completed + failed + skipped;
2257
2258 let call_result = match args.operation {
2259 EnrichOperation::MemoryBindings => call_memory_bindings(
2260 &conn,
2261 &namespace,
2262 &item_key,
2263 provider_binary
2264 .as_deref()
2265 .expect("provider binary required"),
2266 provider_model,
2267 provider_timeout,
2268 &args.mode,
2269 ),
2270 EnrichOperation::EntityDescriptions => call_entity_description(
2271 &conn,
2272 &namespace,
2273 &item_key,
2274 provider_binary
2275 .as_deref()
2276 .expect("provider binary required"),
2277 provider_model,
2278 provider_timeout,
2279 &args.mode,
2280 ),
2281 EnrichOperation::BodyEnrich => call_body_enrich(
2282 &conn,
2283 &namespace,
2284 &item_key,
2285 provider_binary
2286 .as_deref()
2287 .expect("provider binary required"),
2288 provider_model,
2289 provider_timeout,
2290 &args.mode,
2291 args.min_output_chars,
2292 args.max_output_chars,
2293 args.prompt_template.as_deref(),
2294 args.preserve_threshold,
2295 &paths,
2296 llm_backend,
2297 embedding_backend,
2298 ),
2299 EnrichOperation::ReEmbed => call_reembed(
2300 &conn,
2301 &namespace,
2302 &item_key,
2303 &paths,
2304 llm_backend,
2305 embedding_backend,
2306 ),
2307 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2308 &conn,
2309 &namespace,
2310 &item_key,
2311 provider_binary
2312 .as_deref()
2313 .expect("provider binary required"),
2314 provider_model,
2315 provider_timeout,
2316 &args.mode,
2317 ),
2318 EnrichOperation::RelationReclassify => call_relation_reclassify(
2319 &conn,
2320 &namespace,
2321 &item_key,
2322 provider_binary
2323 .as_deref()
2324 .expect("provider binary required"),
2325 provider_model,
2326 provider_timeout,
2327 &args.mode,
2328 ),
2329 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2330 call_entity_connect(
2331 &conn,
2332 &namespace,
2333 &item_key,
2334 provider_binary
2335 .as_deref()
2336 .expect("provider binary required"),
2337 provider_model,
2338 provider_timeout,
2339 &args.mode,
2340 )
2341 }
2342 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2343 &conn,
2344 &namespace,
2345 &item_key,
2346 provider_binary
2347 .as_deref()
2348 .expect("provider binary required"),
2349 provider_model,
2350 provider_timeout,
2351 &args.mode,
2352 ),
2353 EnrichOperation::DescriptionEnrich => call_description_enrich(
2354 &conn,
2355 &namespace,
2356 &item_key,
2357 provider_binary
2358 .as_deref()
2359 .expect("provider binary required"),
2360 provider_model,
2361 provider_timeout,
2362 &args.mode,
2363 ),
2364 EnrichOperation::DomainClassify => call_domain_classify(
2365 &conn,
2366 &namespace,
2367 &item_key,
2368 provider_binary
2369 .as_deref()
2370 .expect("provider binary required"),
2371 provider_model,
2372 provider_timeout,
2373 &args.mode,
2374 ),
2375 EnrichOperation::GraphAudit => call_graph_audit(
2376 &conn,
2377 &namespace,
2378 &item_key,
2379 provider_binary
2380 .as_deref()
2381 .expect("provider binary required"),
2382 provider_model,
2383 provider_timeout,
2384 &args.mode,
2385 ),
2386 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2387 &conn,
2388 &namespace,
2389 &item_key,
2390 provider_binary
2391 .as_deref()
2392 .expect("provider binary required"),
2393 provider_model,
2394 provider_timeout,
2395 &args.mode,
2396 ),
2397 EnrichOperation::BodyExtract => call_body_extract(
2398 &conn,
2399 &namespace,
2400 &item_key,
2401 provider_binary
2402 .as_deref()
2403 .expect("provider binary required"),
2404 provider_model,
2405 provider_timeout,
2406 &args.mode,
2407 ),
2408 };
2409
2410 match call_result {
2411 Ok(EnrichItemResult::Done {
2412 memory_id,
2413 entity_id,
2414 entities,
2415 rels,
2416 chars_before,
2417 chars_after,
2418 cost,
2419 is_oauth,
2420 }) => {
2421 if is_oauth && !oauth_detected {
2422 oauth_detected = true;
2423 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2424 }
2425 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2426
2427 let persist_err: Option<String> = match args.operation {
2429 EnrichOperation::MemoryBindings => {
2430 None
2432 }
2433 EnrichOperation::EntityDescriptions => {
2434 None
2436 }
2437 EnrichOperation::BodyEnrich => {
2438 None
2440 }
2441 _ => {
2442 None
2444 }
2445 };
2446
2447 if let Err(e) = queue_conn.execute(
2448 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2449 rusqlite::params![
2450 memory_id,
2451 entity_id,
2452 entities as i64,
2453 rels as i64,
2454 cost,
2455 item_started.elapsed().as_millis() as i64,
2456 queue_id
2457 ],
2458 ) {
2459 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2460 }
2461
2462 if persist_err.is_none() {
2463 completed += 1;
2464 if !is_oauth {
2465 cost_total += cost;
2466 }
2467 emit_json(&ItemEvent {
2468 item: &item_key,
2469 status: "done",
2470 memory_id,
2471 entity_id,
2472 entities: Some(entities),
2473 rels: Some(rels),
2474 chars_before,
2475 chars_after,
2476 cost_usd: if is_oauth { None } else { Some(cost) },
2477 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2478 error: None,
2479 index: current_index,
2480 total,
2481 });
2482 } else {
2483 failed += 1;
2484 emit_json(&ItemEvent {
2485 item: &item_key,
2486 status: "failed",
2487 memory_id: None,
2488 entity_id: None,
2489 entities: None,
2490 rels: None,
2491 chars_before: None,
2492 chars_after: None,
2493 cost_usd: None,
2494 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2495 error: persist_err,
2496 index: current_index,
2497 total,
2498 });
2499 }
2500 }
2501 Ok(EnrichItemResult::Skipped { reason }) => {
2502 skipped += 1;
2503 if let Err(e) = queue_conn.execute(
2504 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2505 rusqlite::params![reason, queue_id],
2506 ) {
2507 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2508 }
2509 emit_json(&ItemEvent {
2510 item: &item_key,
2511 status: "skipped",
2512 memory_id: None,
2513 entity_id: None,
2514 entities: None,
2515 rels: None,
2516 chars_before: None,
2517 chars_after: None,
2518 cost_usd: None,
2519 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2520 error: None,
2521 index: current_index,
2522 total,
2523 });
2524 }
2525 Ok(EnrichItemResult::PreservationFailed {
2526 score,
2527 threshold,
2528 chars_before,
2529 chars_after,
2530 }) => {
2531 skipped += 1;
2538 let reason = format!(
2539 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2540 );
2541 if let Err(qe) = queue_conn.execute(
2542 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2543 rusqlite::params![reason, queue_id],
2544 ) {
2545 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2546 }
2547 emit_json(&ItemEvent {
2548 item: &item_key,
2549 status: "preservation_failed",
2550 memory_id: None,
2551 entity_id: None,
2552 entities: None,
2553 rels: None,
2554 chars_before: Some(chars_before),
2555 chars_after: Some(chars_after),
2556 cost_usd: None,
2557 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2558 error: Some(reason),
2559 index: current_index,
2560 total,
2561 });
2562 }
2563 Err(e) => {
2564 let err_str = format!("{e}");
2565 if matches!(e, AppError::RateLimited { .. }) {
2566 if crate::retry::is_kill_switch_active() {
2567 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2568 } else if std::time::Instant::now() >= rate_limit_deadline {
2569 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2570 } else {
2571 let half = backoff_secs / 2;
2572 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2573 let actual_wait = half + jitter;
2574 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2575 if let Err(qe) = queue_conn.execute(
2576 "UPDATE queue SET status='pending' WHERE id=?1",
2577 rusqlite::params![queue_id],
2578 ) {
2579 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2580 }
2581 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2582 backoff_secs = (backoff_secs * 2).min(900);
2583 continue;
2584 }
2585 }
2586
2587 failed += 1;
2588 if let Err(qe) = queue_conn.execute(
2589 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2590 rusqlite::params![err_str, queue_id],
2591 ) {
2592 tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2593 }
2594 emit_json(&ItemEvent {
2595 item: &item_key,
2596 status: "failed",
2597 memory_id: None,
2598 entity_id: None,
2599 entities: None,
2600 rels: None,
2601 chars_before: None,
2602 chars_after: None,
2603 cost_usd: None,
2604 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2605 error: Some(err_str),
2606 index: current_index,
2607 total,
2608 });
2609 }
2610 }
2611
2612 let _ = item_type; }
2614 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2617 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2618
2619 emit_json(&EnrichSummary {
2620 summary: true,
2621 operation: format!("{:?}", args.operation),
2622 items_total: total,
2623 completed,
2624 failed,
2625 skipped,
2626 cost_usd: cost_total,
2627 elapsed_ms: started.elapsed().as_millis() as u64,
2628 backend_invoked: take_enrich_backend(),
2629 });
2630
2631 if failed == 0 {
2632 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2633 }
2634
2635 Ok(())
2636}
2637
2638enum EnrichItemResult {
2643 Done {
2644 memory_id: Option<i64>,
2645 entity_id: Option<i64>,
2646 entities: usize,
2647 rels: usize,
2648 chars_before: Option<usize>,
2649 chars_after: Option<usize>,
2650 cost: f64,
2651 is_oauth: bool,
2652 },
2653 Skipped {
2654 reason: String,
2655 },
2656 PreservationFailed {
2661 score: f64,
2662 threshold: f64,
2663 chars_before: usize,
2664 chars_after: usize,
2665 },
2666}
2667
2668fn call_memory_bindings(
2673 conn: &Connection,
2674 namespace: &str,
2675 memory_name: &str,
2676 binary: &Path,
2677 model: Option<&str>,
2678 timeout: u64,
2679 mode: &EnrichMode,
2680) -> Result<EnrichItemResult, AppError> {
2681 let (memory_id, body): (i64, String) = conn.query_row(
2683 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2684 rusqlite::params![namespace, memory_name],
2685 |r| Ok((r.get(0)?, r.get(1)?)),
2686 ).map_err(|e| match e {
2687 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2688 other => AppError::Database(other),
2689 })?;
2690
2691 if body.trim().is_empty() {
2692 return Ok(EnrichItemResult::Skipped {
2693 reason: "body is empty".to_string(),
2694 });
2695 }
2696
2697 let (value, cost, is_oauth) = match mode {
2698 EnrichMode::ClaudeCode => call_claude(
2699 binary,
2700 BINDINGS_PROMPT,
2701 BINDINGS_SCHEMA,
2702 &body,
2703 model,
2704 timeout,
2705 )?,
2706 EnrichMode::Codex => call_codex(
2707 binary,
2708 BINDINGS_PROMPT,
2709 BINDINGS_SCHEMA,
2710 &body,
2711 model,
2712 timeout,
2713 )?,
2714 EnrichMode::Opencode => call_opencode(
2715 binary,
2716 BINDINGS_PROMPT,
2717 BINDINGS_SCHEMA,
2718 &body,
2719 model,
2720 timeout,
2721 )?,
2722 };
2723
2724 let empty_arr = serde_json::Value::Array(vec![]);
2725 let entities_val = value.get("entities").unwrap_or(&empty_arr);
2726 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2727
2728 let (ent_count, rel_count) =
2729 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2730
2731 Ok(EnrichItemResult::Done {
2732 memory_id: Some(memory_id),
2733 entity_id: None,
2734 entities: ent_count,
2735 rels: rel_count,
2736 chars_before: None,
2737 chars_after: None,
2738 cost,
2739 is_oauth,
2740 })
2741}
2742
2743fn call_entity_description(
2744 conn: &Connection,
2745 namespace: &str,
2746 entity_name: &str,
2747 binary: &Path,
2748 model: Option<&str>,
2749 timeout: u64,
2750 mode: &EnrichMode,
2751) -> Result<EnrichItemResult, AppError> {
2752 let (entity_id, entity_type): (i64, String) = conn
2753 .query_row(
2754 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2755 rusqlite::params![namespace, entity_name],
2756 |r| Ok((r.get(0)?, r.get(1)?)),
2757 )
2758 .map_err(|e| match e {
2759 rusqlite::Error::QueryReturnedNoRows => {
2760 AppError::NotFound(format!("entity '{entity_name}' not found"))
2761 }
2762 other => AppError::Database(other),
2763 })?;
2764
2765 let prompt = format!(
2766 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2767 );
2768
2769 let (value, cost, is_oauth) = match mode {
2770 EnrichMode::ClaudeCode => call_claude(
2771 binary,
2772 &prompt,
2773 ENTITY_DESCRIPTION_SCHEMA,
2774 "",
2775 model,
2776 timeout,
2777 )?,
2778 EnrichMode::Codex => call_codex(
2779 binary,
2780 &prompt,
2781 ENTITY_DESCRIPTION_SCHEMA,
2782 "",
2783 model,
2784 timeout,
2785 )?,
2786 EnrichMode::Opencode => call_opencode(
2787 binary,
2788 &prompt,
2789 ENTITY_DESCRIPTION_SCHEMA,
2790 "",
2791 model,
2792 timeout,
2793 )?,
2794 };
2795
2796 let description = value
2797 .get("description")
2798 .and_then(|v| v.as_str())
2799 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2800
2801 persist_entity_description(conn, entity_id, description)?;
2802
2803 Ok(EnrichItemResult::Done {
2804 memory_id: None,
2805 entity_id: Some(entity_id),
2806 entities: 0,
2807 rels: 0,
2808 chars_before: None,
2809 chars_after: None,
2810 cost,
2811 is_oauth,
2812 })
2813}
2814
2815#[allow(clippy::too_many_arguments)]
2816fn call_body_enrich(
2817 conn: &Connection,
2818 namespace: &str,
2819 memory_name: &str,
2820 binary: &Path,
2821 model: Option<&str>,
2822 timeout: u64,
2823 mode: &EnrichMode,
2824 min_output_chars: usize,
2825 max_output_chars: usize,
2826 prompt_template: Option<&Path>,
2827 preserve_threshold: f64,
2828 paths: &crate::paths::AppPaths,
2829 llm_backend: crate::cli::LlmBackendChoice,
2830 embedding_backend: crate::cli::EmbeddingBackendChoice,
2831) -> Result<EnrichItemResult, AppError> {
2832 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2833 .query_row(
2834 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2835 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2836 rusqlite::params![namespace, memory_name],
2837 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2838 )
2839 .map_err(|e| match e {
2840 rusqlite::Error::QueryReturnedNoRows => {
2841 AppError::NotFound(format!("memory '{memory_name}' not found"))
2842 }
2843 other => AppError::Database(other),
2844 })?;
2845
2846 let chars_before = body.chars().count();
2847
2848 let linked_entities: Vec<String> = {
2850 let mut stmt = conn.prepare_cached(
2851 "SELECT e.name FROM memory_entities me \
2852 JOIN entities e ON e.id = me.entity_id \
2853 WHERE me.memory_id = ?1 LIMIT 10",
2854 )?;
2855 let result: Vec<String> = stmt
2856 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2857 .filter_map(|r| r.ok())
2858 .collect();
2859 drop(stmt);
2860 result
2861 };
2862
2863 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2865 let file_size = std::fs::metadata(tmpl_path)
2866 .map_err(|e| {
2867 AppError::Io(std::io::Error::new(
2868 e.kind(),
2869 format!("failed to stat prompt template: {e}"),
2870 ))
2871 })?
2872 .len();
2873 if file_size > MAX_MEMORY_BODY_LEN as u64 {
2874 return Err(AppError::LimitExceeded(
2875 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2876 ));
2877 }
2878 std::fs::read_to_string(tmpl_path).map_err(|e| {
2879 AppError::Io(std::io::Error::new(
2880 e.kind(),
2881 format!("failed to read prompt template: {e}"),
2882 ))
2883 })?
2884 } else {
2885 BODY_ENRICH_PROMPT_PREFIX.to_string()
2886 };
2887
2888 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2890 let mut ctx = String::new();
2891 ctx.push_str(&format!(
2892 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2893 ));
2894 if !description.is_empty() {
2895 ctx.push_str(&format!("- Description: {description}\n"));
2896 }
2897 ctx.push_str(&format!("- Domain: {namespace}\n"));
2898 if !linked_entities.is_empty() {
2899 ctx.push_str(&format!(
2900 "- Linked entities: {}\n",
2901 linked_entities.join(", ")
2902 ));
2903 }
2904 ctx
2905 } else {
2906 String::new()
2907 };
2908
2909 let prompt = format!(
2910 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2911 );
2912
2913 let (value, cost, is_oauth) = match mode {
2915 EnrichMode::ClaudeCode => {
2916 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2917 }
2918 EnrichMode::Codex => {
2919 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2920 }
2921 EnrichMode::Opencode => {
2922 call_opencode(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2923 }
2924 };
2925
2926 let enriched_body = value
2927 .get("enriched_body")
2928 .and_then(|v| v.as_str())
2929 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2930
2931 let chars_after = enriched_body.chars().count();
2932
2933 let threshold = preserve_threshold;
2940 let verdict =
2941 crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2942 if !verdict.is_accepted() {
2943 return Ok(EnrichItemResult::PreservationFailed {
2944 score: match verdict {
2945 crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2946 crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2947 crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2948 },
2949 threshold,
2950 chars_before,
2951 chars_after,
2952 });
2953 }
2954
2955 let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2961 let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2962 if old_hash == new_hash {
2963 return Ok(EnrichItemResult::Skipped {
2964 reason: format!(
2965 "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2966 ),
2967 });
2968 }
2969
2970 if chars_after <= chars_before {
2972 return Ok(EnrichItemResult::Skipped {
2973 reason: format!(
2974 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2975 ),
2976 });
2977 }
2978
2979 persist_enriched_body(
2980 conn,
2981 namespace,
2982 memory_id,
2983 memory_name,
2984 enriched_body,
2985 paths,
2986 llm_backend,
2987 embedding_backend,
2988 )?;
2989
2990 Ok(EnrichItemResult::Done {
2991 memory_id: Some(memory_id),
2992 entity_id: None,
2993 entities: 0,
2994 rels: 0,
2995 chars_before: Some(chars_before),
2996 chars_after: Some(chars_after),
2997 cost,
2998 is_oauth,
2999 })
3000}
3001
3002fn call_reembed(
3003 conn: &Connection,
3004 namespace: &str,
3005 memory_name: &str,
3006 paths: &crate::paths::AppPaths,
3007 llm_backend: crate::cli::LlmBackendChoice,
3008 embedding_backend: crate::cli::EmbeddingBackendChoice,
3009) -> Result<EnrichItemResult, AppError> {
3010 let (memory_id, body, memory_type): (i64, String, String) = conn
3011 .query_row(
3012 "SELECT id, COALESCE(body,''), COALESCE(type,'note')
3013 FROM memories
3014 WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
3015 rusqlite::params![namespace, memory_name],
3016 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3017 )
3018 .map_err(|e| match e {
3019 rusqlite::Error::QueryReturnedNoRows => {
3020 AppError::NotFound(format!("memory '{memory_name}' not found"))
3021 }
3022 other => AppError::Database(other),
3023 })?;
3024
3025 if body.trim().is_empty() {
3026 return Ok(EnrichItemResult::Skipped {
3027 reason: "body is empty".to_string(),
3028 });
3029 }
3030
3031 reembed_memory_vector(
3032 conn,
3033 namespace,
3034 memory_id,
3035 memory_name,
3036 &memory_type,
3037 &body,
3038 paths,
3039 llm_backend,
3040 embedding_backend,
3041 )?;
3042
3043 Ok(EnrichItemResult::Done {
3044 memory_id: Some(memory_id),
3045 entity_id: None,
3046 entities: 0,
3047 rels: 0,
3048 chars_before: Some(body.chars().count()),
3049 chars_after: Some(body.chars().count()),
3050 cost: 0.0,
3051 is_oauth: true,
3052 })
3053}
3054
3055fn scan_operation(
3060 conn: &Connection,
3061 namespace: &str,
3062 args: &EnrichArgs,
3063) -> Result<Vec<String>, AppError> {
3064 let name_filter = resolve_name_filter(args)?;
3066 match args.operation {
3067 EnrichOperation::MemoryBindings => {
3068 let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
3069 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3070 }
3071 EnrichOperation::EntityDescriptions => {
3072 let rows =
3073 scan_entities_without_description(conn, namespace, args.limit, &name_filter)?;
3074 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3075 }
3076 EnrichOperation::BodyEnrich => {
3077 let rows = scan_short_body_memories(
3078 conn,
3079 namespace,
3080 args.min_output_chars,
3081 args.limit,
3082 &name_filter,
3083 )?;
3084 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3085 }
3086 EnrichOperation::ReEmbed => {
3087 let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
3088 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3089 }
3090 EnrichOperation::WeightCalibrate => {
3091 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
3092 Ok(rows
3093 .into_iter()
3094 .map(|(id, _, _, _, _)| id.to_string())
3095 .collect())
3096 }
3097 EnrichOperation::RelationReclassify => {
3098 let rows = scan_generic_relations(conn, namespace, args.limit)?;
3099 Ok(rows
3100 .into_iter()
3101 .map(|(id, _, _, _)| id.to_string())
3102 .collect())
3103 }
3104 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
3105 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
3106 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
3107 }
3108 EnrichOperation::EntityTypeValidate => {
3109 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
3110 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3111 }
3112 EnrichOperation::DescriptionEnrich => {
3113 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
3114 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3115 }
3116 EnrichOperation::DomainClassify
3117 | EnrichOperation::GraphAudit
3118 | EnrichOperation::DeepResearchSynth
3119 | EnrichOperation::BodyExtract => {
3120 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3121 let sql = format!(
3122 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
3123 );
3124 let mut stmt = conn.prepare(&sql)?;
3125 let names = stmt
3126 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
3127 .collect::<Result<Vec<_>, _>>()?;
3128 Ok(names)
3129 }
3130 }
3131}
3132
3133fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
3139 if let Some(p) = explicit {
3140 if p.exists() {
3141 return Ok(p.to_path_buf());
3142 }
3143 return Err(AppError::Validation(format!(
3144 "Codex binary not found at explicit path: {}",
3145 p.display()
3146 )));
3147 }
3148
3149 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
3150 let p = PathBuf::from(&env_path);
3151 if p.exists() {
3152 return Ok(p);
3153 }
3154 }
3155
3156 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
3157 if let Some(path_var) = std::env::var_os("PATH") {
3158 for dir in std::env::split_paths(&path_var) {
3159 let candidate = dir.join(name);
3160 if candidate.exists() {
3161 return Ok(crate::extract::llm_embedding::resolve_real_binary(
3162 &candidate,
3163 ));
3164 }
3165 }
3166 }
3167
3168 Err(AppError::Validation(
3169 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
3170 ))
3171}
3172
3173fn call_weight_calibrate(
3175 conn: &Connection,
3176 _namespace: &str,
3177 item_key: &str,
3178 binary: &Path,
3179 model: Option<&str>,
3180 timeout: u64,
3181 mode: &EnrichMode,
3182) -> Result<EnrichItemResult, AppError> {
3183 let rel_id: i64 = item_key
3184 .parse()
3185 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3186 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
3187 .query_row(
3188 "SELECT e1.name, e2.name, r.relation, r.weight \
3189 FROM relationships r \
3190 JOIN entities e1 ON e1.id = r.source_id \
3191 JOIN entities e2 ON e2.id = r.target_id \
3192 WHERE r.id = ?1",
3193 rusqlite::params![rel_id],
3194 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
3195 )
3196 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3197
3198 let input_text = format!(
3199 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
3200 );
3201 let (value, cost, is_oauth) = match mode {
3202 EnrichMode::ClaudeCode => call_claude(
3203 binary,
3204 WEIGHT_CALIBRATE_PROMPT,
3205 WEIGHT_CALIBRATE_SCHEMA,
3206 &input_text,
3207 model,
3208 timeout,
3209 )?,
3210 EnrichMode::Codex => call_codex(
3211 binary,
3212 WEIGHT_CALIBRATE_PROMPT,
3213 WEIGHT_CALIBRATE_SCHEMA,
3214 &input_text,
3215 model,
3216 timeout,
3217 )?,
3218 EnrichMode::Opencode => call_opencode(
3219 binary,
3220 WEIGHT_CALIBRATE_PROMPT,
3221 WEIGHT_CALIBRATE_SCHEMA,
3222 &input_text,
3223 model,
3224 timeout,
3225 )?,
3226 };
3227
3228 let calibrated = value
3229 .get("calibrated_weight")
3230 .and_then(|v| v.as_f64())
3231 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
3232
3233 conn.execute(
3234 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
3235 rusqlite::params![calibrated, rel_id],
3236 )?;
3237
3238 Ok(EnrichItemResult::Done {
3239 memory_id: None,
3240 entity_id: None,
3241 entities: 0,
3242 rels: 1,
3243 chars_before: None,
3244 chars_after: None,
3245 cost,
3246 is_oauth,
3247 })
3248}
3249
3250fn call_relation_reclassify(
3252 conn: &Connection,
3253 _namespace: &str,
3254 item_key: &str,
3255 binary: &Path,
3256 model: Option<&str>,
3257 timeout: u64,
3258 mode: &EnrichMode,
3259) -> Result<EnrichItemResult, AppError> {
3260 let rel_id: i64 = item_key
3261 .parse()
3262 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3263 let (source_name, target_name, current_relation): (String, String, String) = conn
3264 .query_row(
3265 "SELECT e1.name, e2.name, r.relation \
3266 FROM relationships r \
3267 JOIN entities e1 ON e1.id = r.source_id \
3268 JOIN entities e2 ON e2.id = r.target_id \
3269 WHERE r.id = ?1",
3270 rusqlite::params![rel_id],
3271 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3272 )
3273 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3274
3275 let input_text = format!(
3276 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3277 );
3278 let (value, cost, is_oauth) = match mode {
3279 EnrichMode::ClaudeCode => call_claude(
3280 binary,
3281 RELATION_RECLASSIFY_PROMPT,
3282 RELATION_RECLASSIFY_SCHEMA,
3283 &input_text,
3284 model,
3285 timeout,
3286 )?,
3287 EnrichMode::Codex => call_codex(
3288 binary,
3289 RELATION_RECLASSIFY_PROMPT,
3290 RELATION_RECLASSIFY_SCHEMA,
3291 &input_text,
3292 model,
3293 timeout,
3294 )?,
3295 EnrichMode::Opencode => call_opencode(
3296 binary,
3297 RELATION_RECLASSIFY_PROMPT,
3298 RELATION_RECLASSIFY_SCHEMA,
3299 &input_text,
3300 model,
3301 timeout,
3302 )?,
3303 };
3304
3305 let new_relation = value
3306 .get("relation")
3307 .and_then(|v| v.as_str())
3308 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3309 let new_strength = value
3310 .get("strength")
3311 .and_then(|v| v.as_f64())
3312 .unwrap_or(0.5);
3313
3314 conn.execute(
3315 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3316 rusqlite::params![new_relation, new_strength, rel_id],
3317 )?;
3318
3319 Ok(EnrichItemResult::Done {
3320 memory_id: None,
3321 entity_id: None,
3322 entities: 0,
3323 rels: 1,
3324 chars_before: None,
3325 chars_after: None,
3326 cost,
3327 is_oauth,
3328 })
3329}
3330
3331fn call_entity_connect(
3333 conn: &Connection,
3334 namespace: &str,
3335 item_key: &str,
3336 binary: &Path,
3337 model: Option<&str>,
3338 timeout: u64,
3339 mode: &EnrichMode,
3340) -> Result<EnrichItemResult, AppError> {
3341 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3342 let (e1_id, e1_name, e2_id, e2_name) =
3343 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3344 Some(p) => p,
3345 None => {
3346 return Ok(EnrichItemResult::Skipped {
3347 reason: "pair no longer isolated".into(),
3348 })
3349 }
3350 };
3351 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3352 let (value, cost, is_oauth) = match mode {
3353 EnrichMode::ClaudeCode => call_claude(
3354 binary,
3355 ENTITY_CONNECT_PROMPT,
3356 ENTITY_CONNECT_SCHEMA,
3357 &input_text,
3358 model,
3359 timeout,
3360 )?,
3361 EnrichMode::Codex => call_codex(
3362 binary,
3363 ENTITY_CONNECT_PROMPT,
3364 ENTITY_CONNECT_SCHEMA,
3365 &input_text,
3366 model,
3367 timeout,
3368 )?,
3369 EnrichMode::Opencode => call_opencode(
3370 binary,
3371 ENTITY_CONNECT_PROMPT,
3372 ENTITY_CONNECT_SCHEMA,
3373 &input_text,
3374 model,
3375 timeout,
3376 )?,
3377 };
3378 let relation = value
3379 .get("relation")
3380 .and_then(|v| v.as_str())
3381 .unwrap_or("none");
3382 if relation == "none" {
3383 return Ok(EnrichItemResult::Skipped {
3384 reason: "LLM determined no relationship".into(),
3385 });
3386 }
3387 let strength = value
3388 .get("strength")
3389 .and_then(|v| v.as_f64())
3390 .unwrap_or(0.5);
3391 conn.execute(
3392 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3393 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3394 )?;
3395 Ok(EnrichItemResult::Done {
3396 memory_id: None,
3397 entity_id: None,
3398 entities: 0,
3399 rels: 1,
3400 chars_before: None,
3401 chars_after: None,
3402 cost,
3403 is_oauth,
3404 })
3405}
3406
3407fn call_entity_type_validate(
3409 conn: &Connection,
3410 _namespace: &str,
3411 item_key: &str,
3412 binary: &Path,
3413 model: Option<&str>,
3414 timeout: u64,
3415 mode: &EnrichMode,
3416) -> Result<EnrichItemResult, AppError> {
3417 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3418 .query_row(
3419 "SELECT id, name, type FROM entities WHERE name = ?1",
3420 rusqlite::params![item_key],
3421 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3422 )
3423 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3424 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3425 let (value, cost, is_oauth) = match mode {
3426 EnrichMode::ClaudeCode => call_claude(
3427 binary,
3428 ENTITY_TYPE_VALIDATE_PROMPT,
3429 ENTITY_TYPE_VALIDATE_SCHEMA,
3430 &input_text,
3431 model,
3432 timeout,
3433 )?,
3434 EnrichMode::Codex => call_codex(
3435 binary,
3436 ENTITY_TYPE_VALIDATE_PROMPT,
3437 ENTITY_TYPE_VALIDATE_SCHEMA,
3438 &input_text,
3439 model,
3440 timeout,
3441 )?,
3442 EnrichMode::Opencode => call_opencode(
3443 binary,
3444 ENTITY_TYPE_VALIDATE_PROMPT,
3445 ENTITY_TYPE_VALIDATE_SCHEMA,
3446 &input_text,
3447 model,
3448 timeout,
3449 )?,
3450 };
3451 let validated_type = value
3452 .get("validated_type")
3453 .and_then(|v| v.as_str())
3454 .unwrap_or(&ent_type);
3455 let was_correct = value
3456 .get("was_correct")
3457 .and_then(|v| v.as_bool())
3458 .unwrap_or(true);
3459 if !was_correct {
3460 conn.execute(
3461 "UPDATE entities SET type = ?1 WHERE id = ?2",
3462 rusqlite::params![validated_type, ent_id],
3463 )?;
3464 }
3465 Ok(EnrichItemResult::Done {
3466 memory_id: None,
3467 entity_id: Some(ent_id),
3468 entities: 1,
3469 rels: 0,
3470 chars_before: None,
3471 chars_after: None,
3472 cost,
3473 is_oauth,
3474 })
3475}
3476
3477fn call_description_enrich(
3479 conn: &Connection,
3480 _namespace: &str,
3481 item_key: &str,
3482 binary: &Path,
3483 model: Option<&str>,
3484 timeout: u64,
3485 mode: &EnrichMode,
3486) -> Result<EnrichItemResult, AppError> {
3487 let (mem_id, body, old_desc): (i64, String, String) = conn
3488 .query_row(
3489 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3490 rusqlite::params![item_key],
3491 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3492 )
3493 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3494 let snippet: String = body.chars().take(500).collect();
3495 let input_text = format!(
3496 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3497 );
3498 let (value, cost, is_oauth) = match mode {
3499 EnrichMode::ClaudeCode => call_claude(
3500 binary,
3501 DESCRIPTION_ENRICH_PROMPT,
3502 DESCRIPTION_ENRICH_SCHEMA,
3503 &input_text,
3504 model,
3505 timeout,
3506 )?,
3507 EnrichMode::Codex => call_codex(
3508 binary,
3509 DESCRIPTION_ENRICH_PROMPT,
3510 DESCRIPTION_ENRICH_SCHEMA,
3511 &input_text,
3512 model,
3513 timeout,
3514 )?,
3515 EnrichMode::Opencode => call_opencode(
3516 binary,
3517 DESCRIPTION_ENRICH_PROMPT,
3518 DESCRIPTION_ENRICH_SCHEMA,
3519 &input_text,
3520 model,
3521 timeout,
3522 )?,
3523 };
3524 let new_desc = value
3525 .get("description")
3526 .and_then(|v| v.as_str())
3527 .unwrap_or(&old_desc);
3528 let old_name: String = conn.query_row(
3529 "SELECT name FROM memories WHERE id = ?1",
3530 rusqlite::params![mem_id],
3531 |r| r.get(0),
3532 )?;
3533 conn.execute(
3534 "UPDATE memories SET description = ?1 WHERE id = ?2",
3535 rusqlite::params![new_desc, mem_id],
3536 )?;
3537 memories::sync_fts_after_update(
3538 conn, mem_id, &old_name, &old_desc, &body, &old_name, new_desc, &body,
3539 )?;
3540 Ok(EnrichItemResult::Done {
3541 memory_id: Some(mem_id),
3542 entity_id: None,
3543 entities: 0,
3544 rels: 0,
3545 chars_before: Some(old_desc.len()),
3546 chars_after: Some(new_desc.len()),
3547 cost,
3548 is_oauth,
3549 })
3550}
3551
3552fn call_domain_classify(
3554 conn: &Connection,
3555 _namespace: &str,
3556 item_key: &str,
3557 binary: &Path,
3558 model: Option<&str>,
3559 timeout: u64,
3560 mode: &EnrichMode,
3561) -> Result<EnrichItemResult, AppError> {
3562 let (mem_id, body, desc): (i64, String, String) = conn
3563 .query_row(
3564 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3565 rusqlite::params![item_key],
3566 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3567 )
3568 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3569 let snippet: String = body.chars().take(500).collect();
3570 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
3571 let (value, cost, is_oauth) = match mode {
3572 EnrichMode::ClaudeCode => call_claude(
3573 binary,
3574 DOMAIN_CLASSIFY_PROMPT,
3575 DOMAIN_CLASSIFY_SCHEMA,
3576 &input_text,
3577 model,
3578 timeout,
3579 )?,
3580 EnrichMode::Codex => call_codex(
3581 binary,
3582 DOMAIN_CLASSIFY_PROMPT,
3583 DOMAIN_CLASSIFY_SCHEMA,
3584 &input_text,
3585 model,
3586 timeout,
3587 )?,
3588 EnrichMode::Opencode => call_opencode(
3589 binary,
3590 DOMAIN_CLASSIFY_PROMPT,
3591 DOMAIN_CLASSIFY_SCHEMA,
3592 &input_text,
3593 model,
3594 timeout,
3595 )?,
3596 };
3597 let domain = value
3598 .get("domain")
3599 .and_then(|v| v.as_str())
3600 .unwrap_or("uncategorized");
3601 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3602 conn.execute(
3603 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3604 rusqlite::params![metadata, mem_id],
3605 )?;
3606 Ok(EnrichItemResult::Done {
3607 memory_id: Some(mem_id),
3608 entity_id: None,
3609 entities: 0,
3610 rels: 0,
3611 chars_before: None,
3612 chars_after: None,
3613 cost,
3614 is_oauth,
3615 })
3616}
3617
3618fn call_graph_audit(
3620 conn: &Connection,
3621 _namespace: &str,
3622 item_key: &str,
3623 binary: &Path,
3624 model: Option<&str>,
3625 timeout: u64,
3626 mode: &EnrichMode,
3627) -> Result<EnrichItemResult, AppError> {
3628 let (mem_id, body, desc): (i64, String, String) = conn
3629 .query_row(
3630 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3631 rusqlite::params![item_key],
3632 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3633 )
3634 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3635 let snippet: String = body.chars().take(500).collect();
3636 let ent_count: i64 = conn
3637 .query_row(
3638 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3639 rusqlite::params![mem_id],
3640 |r| r.get(0),
3641 )
3642 .unwrap_or(0);
3643 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3644 let (value, cost, is_oauth) = match mode {
3645 EnrichMode::ClaudeCode => call_claude(
3646 binary,
3647 GRAPH_AUDIT_PROMPT,
3648 GRAPH_AUDIT_SCHEMA,
3649 &input_text,
3650 model,
3651 timeout,
3652 )?,
3653 EnrichMode::Codex => call_codex(
3654 binary,
3655 GRAPH_AUDIT_PROMPT,
3656 GRAPH_AUDIT_SCHEMA,
3657 &input_text,
3658 model,
3659 timeout,
3660 )?,
3661 EnrichMode::Opencode => call_opencode(
3662 binary,
3663 GRAPH_AUDIT_PROMPT,
3664 GRAPH_AUDIT_SCHEMA,
3665 &input_text,
3666 model,
3667 timeout,
3668 )?,
3669 };
3670 let issues = value
3671 .get("issues")
3672 .and_then(|v| v.as_array())
3673 .map(|a| a.len())
3674 .unwrap_or(0);
3675 Ok(EnrichItemResult::Done {
3676 memory_id: Some(mem_id),
3677 entity_id: None,
3678 entities: 0,
3679 rels: issues,
3680 chars_before: None,
3681 chars_after: None,
3682 cost,
3683 is_oauth,
3684 })
3685}
3686
3687fn call_deep_research_synth(
3689 conn: &Connection,
3690 namespace: &str,
3691 item_key: &str,
3692 binary: &Path,
3693 model: Option<&str>,
3694 timeout: u64,
3695 mode: &EnrichMode,
3696) -> Result<EnrichItemResult, AppError> {
3697 let (mem_id, body): (i64, String) = conn
3698 .query_row(
3699 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3700 rusqlite::params![item_key],
3701 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3702 )
3703 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3704 let snippet: String = body.chars().take(2000).collect();
3705 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3706 let (value, cost, is_oauth) = match mode {
3707 EnrichMode::ClaudeCode => call_claude(
3708 binary,
3709 DEEP_RESEARCH_SYNTH_PROMPT,
3710 DEEP_RESEARCH_SYNTH_SCHEMA,
3711 &input_text,
3712 model,
3713 timeout,
3714 )?,
3715 EnrichMode::Codex => call_codex(
3716 binary,
3717 DEEP_RESEARCH_SYNTH_PROMPT,
3718 DEEP_RESEARCH_SYNTH_SCHEMA,
3719 &input_text,
3720 model,
3721 timeout,
3722 )?,
3723 EnrichMode::Opencode => call_opencode(
3724 binary,
3725 DEEP_RESEARCH_SYNTH_PROMPT,
3726 DEEP_RESEARCH_SYNTH_SCHEMA,
3727 &input_text,
3728 model,
3729 timeout,
3730 )?,
3731 };
3732 let mut ent_count = 0usize;
3733 let mut rel_count = 0usize;
3734 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3735 for e in ents {
3736 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3737 let etype_str = e
3738 .get("entity_type")
3739 .and_then(|v| v.as_str())
3740 .unwrap_or("concept");
3741 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3742 if name.len() >= 2 {
3743 let ne = NewEntity {
3744 name: name.to_string(),
3745 entity_type: etype,
3746 description: None,
3747 };
3748 let _ = entities::upsert_entity(conn, namespace, &ne);
3749 ent_count += 1;
3750 }
3751 }
3752 }
3753 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3754 for r in rels {
3755 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3756 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3757 if src.is_empty() || tgt.is_empty() {
3758 continue;
3759 }
3760 let rel = r
3761 .get("relation")
3762 .and_then(|v| v.as_str())
3763 .unwrap_or("related");
3764 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3765 if let (Some(sid), Some(tid)) = (
3766 entities::find_entity_id(conn, namespace, src)?,
3767 entities::find_entity_id(conn, namespace, tgt)?,
3768 ) {
3769 let _ = entities::create_or_fetch_relationship(
3770 conn, namespace, sid, tid, rel, str_, None,
3771 );
3772 rel_count += 1;
3773 }
3774 }
3775 }
3776 Ok(EnrichItemResult::Done {
3777 memory_id: Some(mem_id),
3778 entity_id: None,
3779 entities: ent_count,
3780 rels: rel_count,
3781 chars_before: None,
3782 chars_after: None,
3783 cost,
3784 is_oauth,
3785 })
3786}
3787
3788fn call_body_extract(
3790 conn: &Connection,
3791 _namespace: &str,
3792 item_key: &str,
3793 binary: &Path,
3794 model: Option<&str>,
3795 timeout: u64,
3796 mode: &EnrichMode,
3797) -> Result<EnrichItemResult, AppError> {
3798 let (mem_id, body, old_desc): (i64, String, String) = conn
3799 .query_row(
3800 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3801 rusqlite::params![item_key],
3802 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3803 )
3804 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3805 let old_name: String = conn.query_row(
3806 "SELECT name FROM memories WHERE id = ?1",
3807 rusqlite::params![mem_id],
3808 |r| r.get(0),
3809 )?;
3810 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3811 let (value, cost, is_oauth) = match mode {
3812 EnrichMode::ClaudeCode => call_claude(
3813 binary,
3814 BODY_EXTRACT_PROMPT,
3815 BODY_EXTRACT_SCHEMA,
3816 &input_text,
3817 model,
3818 timeout,
3819 )?,
3820 EnrichMode::Codex => call_codex(
3821 binary,
3822 BODY_EXTRACT_PROMPT,
3823 BODY_EXTRACT_SCHEMA,
3824 &input_text,
3825 model,
3826 timeout,
3827 )?,
3828 EnrichMode::Opencode => call_opencode(
3829 binary,
3830 BODY_EXTRACT_PROMPT,
3831 BODY_EXTRACT_SCHEMA,
3832 &input_text,
3833 model,
3834 timeout,
3835 )?,
3836 };
3837 let restructured = value
3838 .get("restructured_body")
3839 .and_then(|v| v.as_str())
3840 .unwrap_or(&body);
3841 let chars_before = body.len();
3842 let chars_after = restructured.len();
3843 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3844 conn.execute(
3845 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3846 rusqlite::params![restructured, new_hash, mem_id],
3847 )?;
3848 memories::sync_fts_after_update(
3849 conn,
3850 mem_id,
3851 &old_name,
3852 &old_desc,
3853 &body,
3854 &old_name,
3855 &old_desc,
3856 restructured,
3857 )?;
3858 Ok(EnrichItemResult::Done {
3859 memory_id: Some(mem_id),
3860 entity_id: None,
3861 entities: 0,
3862 rels: 0,
3863 chars_before: Some(chars_before),
3864 chars_after: Some(chars_after),
3865 cost,
3866 is_oauth,
3867 })
3868}
3869
3870#[allow(clippy::type_complexity)]
3872fn scan_isolated_entity_pairs(
3873 conn: &Connection,
3874 namespace: &str,
3875 limit: Option<usize>,
3876) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3877 let limit_val = limit.unwrap_or(50) as i64;
3878 let mut stmt = conn.prepare_cached(
3879 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3880 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3881 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3882 (r.source_id = e1.id AND r.target_id = e2.id) OR \
3883 (r.source_id = e2.id AND r.target_id = e1.id)) \
3884 LIMIT ?2",
3885 )?;
3886 let rows = stmt
3887 .query_map(rusqlite::params![namespace, limit_val], |r| {
3888 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3889 })?
3890 .collect::<Result<Vec<_>, _>>()?;
3891 Ok(rows)
3892}
3893
3894fn scan_entities_for_type_validation(
3896 conn: &Connection,
3897 namespace: &str,
3898 limit: Option<usize>,
3899) -> Result<Vec<(i64, String, String)>, AppError> {
3900 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3901 let sql = format!(
3902 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3903 );
3904 let mut stmt = conn.prepare(&sql)?;
3905 let rows = stmt
3906 .query_map(rusqlite::params![namespace], |r| {
3907 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3908 })?
3909 .collect::<Result<Vec<_>, _>>()?;
3910 Ok(rows)
3911}
3912
3913fn scan_generic_descriptions(
3915 conn: &Connection,
3916 namespace: &str,
3917 limit: Option<usize>,
3918) -> Result<Vec<(i64, String, String)>, AppError> {
3919 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3920 let sql = format!(
3921 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3922 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3923 ORDER BY id {limit_clause}"
3924 );
3925 let mut stmt = conn.prepare(&sql)?;
3926 let rows = stmt
3927 .query_map(rusqlite::params![namespace], |r| {
3928 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3929 })?
3930 .collect::<Result<Vec<_>, _>>()?;
3931 Ok(rows)
3932}
3933
3934fn call_codex(
3938 binary: &Path,
3939 prompt: &str,
3940 json_schema: &str,
3941 input_text: &str,
3942 model: Option<&str>,
3943 timeout_secs: u64,
3944) -> Result<(serde_json::Value, f64, bool), AppError> {
3945 use wait_timeout::ChildExt;
3946
3947 super::codex_spawn::validate_codex_model(model)?;
3952 let schema_file = super::codex_spawn::trusted_schema_path()?;
3953
3954 let args = super::codex_spawn::CodexSpawnArgs {
3955 binary,
3956 prompt,
3957 json_schema,
3958 input_text,
3959 model,
3960 timeout_secs,
3961 schema_path: schema_file.clone(),
3962 };
3963 let mut cmd = super::codex_spawn::build_codex_command(&args)?;
3964
3965 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3966 AppError::Io(std::io::Error::new(
3967 e.kind(),
3968 format!("failed to spawn codex: {e}"),
3969 ))
3970 })?;
3971
3972 let full_prompt = format!("{prompt}\n\n{input_text}");
3973 let stdin_bytes = full_prompt.into_bytes();
3974 let mut child_stdin = child
3975 .stdin
3976 .take()
3977 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3978 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3979 child_stdin.write_all(&stdin_bytes)?;
3980 drop(child_stdin);
3981 Ok(())
3982 });
3983
3984 let start = std::time::Instant::now();
3985 let timeout = std::time::Duration::from_secs(timeout_secs);
3986 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3987 let _ = std::fs::remove_file(&schema_file);
3988
3989 match status {
3990 Some(exit_status) => {
3991 stdin_thread
3992 .join()
3993 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3994 .map_err(AppError::Io)?;
3995
3996 tracing::debug!(
3997 target: "process",
3998 exit_code = ?exit_status.code(),
3999 elapsed_ms = start.elapsed().as_millis() as u64,
4000 "external process completed"
4001 );
4002
4003 let mut stdout_buf = Vec::new();
4004 if let Some(mut out) = child.stdout.take() {
4005 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4006 }
4007 if !exit_status.success() {
4008 let mut stderr_buf = Vec::new();
4009 if let Some(mut err) = child.stderr.take() {
4010 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4011 }
4012 let stderr_str = String::from_utf8_lossy(&stderr_buf);
4013 tracing::warn!(
4014 target: "enrich",
4015 exit_code = ?exit_status.code(),
4016 stderr = %stderr_str.trim(),
4017 "codex process failed"
4018 );
4019 return Err(AppError::Validation(format!(
4020 "codex exited with code {:?}: {}",
4021 exit_status.code(),
4022 stderr_str.trim()
4023 )));
4024 }
4025 let stdout_str = String::from_utf8(stdout_buf)
4026 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
4027 let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
4030 let value: serde_json::Value =
4036 serde_json::from_str(&result.last_agent_text).map_err(|e| {
4037 AppError::Validation(format!(
4038 "codex agent_message is not valid JSON: {e}; raw={}",
4039 result.last_agent_text
4040 ))
4041 })?;
4042 Ok((value, 0.0, false))
4043 }
4044 None => {
4045 let _ = child.kill();
4046 let _ = child.wait();
4047 let _ = stdin_thread.join();
4048 Err(AppError::Validation(format!(
4049 "codex timed out after {timeout_secs} seconds"
4050 )))
4051 }
4052 }
4053}
4054
4055fn call_opencode(
4056 binary: &Path,
4057 prompt: &str,
4058 json_schema: &str,
4059 input_text: &str,
4060 model: Option<&str>,
4061 timeout_secs: u64,
4062) -> Result<(serde_json::Value, f64, bool), AppError> {
4063 use wait_timeout::ChildExt;
4064
4065 let resolved_model = super::opencode_runner::resolve_opencode_model(model);
4066
4067 let augmented_prompt = if json_schema.is_empty() {
4068 prompt.to_string()
4069 } else {
4070 format!(
4071 "{prompt}\n\nIMPORTANT: You MUST respond with ONLY valid JSON (no markdown, no explanation, no code fences). \
4072 The JSON MUST match this schema:\n{json_schema}"
4073 )
4074 };
4075
4076 let mut cmd = super::opencode_runner::build_opencode_command_sync(
4077 binary,
4078 &resolved_model,
4079 &augmented_prompt,
4080 input_text,
4081 )?;
4082
4083 let mut child = super::opencode_runner::spawn_opencode(&mut cmd).map_err(|e| {
4084 AppError::Io(std::io::Error::new(
4085 e.kind(),
4086 format!("failed to spawn opencode: {e}"),
4087 ))
4088 })?;
4089
4090 let start = std::time::Instant::now();
4091 let timeout = std::time::Duration::from_secs(timeout_secs);
4092 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4093
4094 match status {
4095 Some(exit_status) => {
4096 tracing::debug!(
4097 target: "process",
4098 exit_code = ?exit_status.code(),
4099 elapsed_ms = start.elapsed().as_millis() as u64,
4100 "opencode process completed"
4101 );
4102
4103 let mut stdout_buf = Vec::new();
4104 if let Some(mut out) = child.stdout.take() {
4105 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4106 }
4107 if !exit_status.success() {
4108 let mut stderr_buf = Vec::new();
4109 if let Some(mut err) = child.stderr.take() {
4110 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4111 }
4112 let stderr_str = String::from_utf8_lossy(&stderr_buf);
4113 tracing::warn!(
4114 target: "enrich",
4115 exit_code = ?exit_status.code(),
4116 stderr = %stderr_str.trim(),
4117 "opencode process failed"
4118 );
4119 return Err(AppError::Validation(format!(
4120 "opencode exited with code {:?}: {}",
4121 exit_status.code(),
4122 stderr_str.trim()
4123 )));
4124 }
4125 let stdout_str = String::from_utf8(stdout_buf)
4126 .map_err(|_| AppError::Validation("opencode stdout is not valid UTF-8".into()))?;
4127 let (text, cost, _tokens) = super::opencode_runner::parse_opencode_output(&stdout_str)?;
4128 let value: serde_json::Value =
4129 super::opencode_runner::parse_json_from_opencode_text(&text).map_err(|e| {
4130 AppError::Validation(format!("opencode response is not valid JSON: {e}"))
4131 })?;
4132 Ok((value, cost, false))
4133 }
4134 None => {
4135 let _ = child.kill();
4136 let _ = child.wait();
4137 Err(AppError::Validation(format!(
4138 "opencode timed out after {timeout_secs} seconds"
4139 )))
4140 }
4141 }
4142}
4143
4144#[cfg(test)]
4149mod tests {
4150 use super::*;
4151 use rusqlite::Connection;
4152 #[cfg(unix)]
4153 use std::os::unix::fs::PermissionsExt;
4154
4155 fn open_test_db() -> Connection {
4157 let conn = Connection::open_in_memory().expect("in-memory db");
4158 conn.execute_batch(
4159 "CREATE TABLE memories (
4160 id INTEGER PRIMARY KEY AUTOINCREMENT,
4161 namespace TEXT NOT NULL DEFAULT 'global',
4162 name TEXT NOT NULL,
4163 type TEXT NOT NULL DEFAULT 'note',
4164 description TEXT NOT NULL DEFAULT '',
4165 body TEXT NOT NULL DEFAULT '',
4166 body_hash TEXT NOT NULL DEFAULT '',
4167 session_id TEXT,
4168 source TEXT NOT NULL DEFAULT 'agent',
4169 metadata TEXT NOT NULL DEFAULT '{}',
4170 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
4171 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
4172 deleted_at INTEGER,
4173 UNIQUE(namespace, name)
4174 );
4175 CREATE TABLE entities (
4176 id INTEGER PRIMARY KEY AUTOINCREMENT,
4177 namespace TEXT NOT NULL DEFAULT 'global',
4178 name TEXT NOT NULL,
4179 type TEXT NOT NULL DEFAULT 'concept',
4180 description TEXT,
4181 degree INTEGER NOT NULL DEFAULT 0,
4182 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
4183 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
4184 UNIQUE(namespace, name)
4185 );
4186 CREATE TABLE memory_entities (
4187 memory_id INTEGER NOT NULL,
4188 entity_id INTEGER NOT NULL,
4189 PRIMARY KEY (memory_id, entity_id)
4190 );
4191 CREATE TABLE relationships (
4192 id INTEGER PRIMARY KEY AUTOINCREMENT,
4193 namespace TEXT NOT NULL DEFAULT 'global',
4194 source_id INTEGER NOT NULL,
4195 target_id INTEGER NOT NULL,
4196 relation TEXT NOT NULL,
4197 weight REAL NOT NULL DEFAULT 0.5,
4198 description TEXT,
4199 UNIQUE(source_id, target_id, relation)
4200 );
4201 CREATE TABLE memory_embeddings (
4202 memory_id INTEGER PRIMARY KEY,
4203 namespace TEXT NOT NULL,
4204 embedding BLOB NOT NULL,
4205 source TEXT NOT NULL,
4206 model TEXT NOT NULL DEFAULT '',
4207 dim INTEGER NOT NULL DEFAULT 384,
4208 created_at INTEGER NOT NULL DEFAULT (unixepoch())
4209 );",
4210 )
4211 .expect("schema creation must succeed");
4212 conn
4213 }
4214
4215 #[test]
4216 fn scan_unbound_memories_finds_memories_without_bindings() {
4217 let conn = open_test_db();
4218 conn.execute(
4219 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
4220 [],
4221 )
4222 .unwrap();
4223
4224 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4225 assert_eq!(results.len(), 1);
4226 assert_eq!(results[0].1, "test-mem");
4227 }
4228
4229 #[test]
4230 fn scan_unbound_memories_excludes_bound_memories() {
4231 let conn = open_test_db();
4232 conn.execute(
4233 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
4234 [],
4235 )
4236 .unwrap();
4237 let mem_id: i64 = conn
4238 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
4239 r.get(0)
4240 })
4241 .unwrap();
4242 conn.execute(
4243 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
4244 [],
4245 )
4246 .unwrap();
4247 let ent_id: i64 = conn
4248 .query_row(
4249 "SELECT id FROM entities WHERE name='some-entity'",
4250 [],
4251 |r| r.get(0),
4252 )
4253 .unwrap();
4254 conn.execute(
4255 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
4256 rusqlite::params![mem_id, ent_id],
4257 )
4258 .unwrap();
4259
4260 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4261 assert!(results.is_empty(), "bound memory must not appear in scan");
4262 }
4263
4264 #[test]
4265 fn scan_entities_without_description_finds_null_description() {
4266 let conn = open_test_db();
4267 conn.execute(
4268 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
4269 [],
4270 )
4271 .unwrap();
4272
4273 let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4274 assert_eq!(results.len(), 1);
4275 assert_eq!(results[0].1, "my-tool");
4276 }
4277
4278 #[test]
4279 fn scan_entities_without_description_excludes_entities_with_description() {
4280 let conn = open_test_db();
4281 conn.execute(
4282 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
4283 [],
4284 )
4285 .unwrap();
4286
4287 let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4288 assert!(
4289 results.is_empty(),
4290 "entity with description must not appear"
4291 );
4292 }
4293
4294 #[test]
4295 fn scan_short_body_memories_finds_short_bodies() {
4296 let conn = open_test_db();
4297 conn.execute(
4298 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
4299 [],
4300 )
4301 .unwrap();
4302
4303 let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4304 assert_eq!(results.len(), 1);
4305 assert_eq!(results[0].1, "short-mem");
4306 }
4307
4308 #[test]
4309 fn scan_short_body_memories_excludes_long_bodies() {
4310 let conn = open_test_db();
4311 let long_body = "a".repeat(1000);
4312 conn.execute(
4313 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
4314 rusqlite::params![long_body],
4315 )
4316 .unwrap();
4317
4318 let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4319 assert!(results.is_empty(), "long memory must not appear in scan");
4320 }
4321
4322 #[test]
4323 fn scan_respects_limit() {
4324 let conn = open_test_db();
4325 for i in 0..5 {
4326 conn.execute(
4327 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
4328 [],
4329 )
4330 .unwrap();
4331 }
4332
4333 let results = scan_short_body_memories(&conn, "global", 1000, Some(3), &[]).unwrap();
4334 assert_eq!(results.len(), 3, "limit must be respected");
4335 }
4336
4337 #[test]
4338 fn scan_memories_without_embeddings_finds_only_missing_rows() {
4339 let conn = open_test_db();
4340 conn.execute(
4341 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
4342 [],
4343 )
4344 .unwrap();
4345 conn.execute(
4346 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
4347 [],
4348 )
4349 .unwrap();
4350 let memory_id: i64 = conn
4351 .query_row(
4352 "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
4353 [],
4354 |r| r.get(0),
4355 )
4356 .unwrap();
4357 let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
4358 memories::upsert_vec(
4359 &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
4360 )
4361 .unwrap();
4362
4363 let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
4364 assert_eq!(results.len(), 1);
4365 assert_eq!(results[0].1, "missing-vec");
4366 }
4367
4368 #[test]
4369 fn scan_memories_without_embeddings_respects_name_filter() {
4370 let conn = open_test_db();
4371 conn.execute(
4372 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
4373 [],
4374 )
4375 .unwrap();
4376 conn.execute(
4377 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
4378 [],
4379 )
4380 .unwrap();
4381
4382 let results =
4383 scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
4384 .unwrap();
4385 assert_eq!(results.len(), 1);
4386 assert_eq!(results[0].1, "match-me");
4387 }
4388
4389 #[test]
4390 fn queue_db_schema_creates_correctly() {
4391 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
4392 let conn = open_queue_db(&tmp_path).expect("queue db must open");
4393 let count: i64 = conn
4394 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
4395 .unwrap();
4396 assert_eq!(count, 0);
4397 let _ = std::fs::remove_file(&tmp_path);
4398 }
4399
4400 #[test]
4401 fn parse_claude_output_valid_bindings() {
4402 let output = r#"[
4403 {"type":"system","subtype":"init"},
4404 {"type":"result","is_error":false,"total_cost_usd":0.01,
4405 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
4406 ]"#;
4407 let result = crate::commands::claude_runner::parse_claude_output(output)
4408 .expect("must parse successfully");
4409 assert!(result.value.get("entities").is_some());
4410 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
4411 assert!(!result.is_oauth);
4412 }
4413
4414 #[test]
4415 fn parse_claude_output_detects_oauth() {
4416 let output = r#"[
4417 {"type":"system","subtype":"init","apiKeySource":"none"},
4418 {"type":"result","is_error":false,"total_cost_usd":0.0,
4419 "structured_output":{"entities":[],"relationships":[]}}
4420 ]"#;
4421 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
4422 assert!(result.is_oauth);
4423 }
4424
4425 #[test]
4426 fn parse_claude_output_rate_limit_returns_error() {
4427 let output = r#"[
4428 {"type":"system","subtype":"init"},
4429 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
4430 ]"#;
4431 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4432 assert!(matches!(err, AppError::RateLimited { .. }));
4433 }
4434
4435 #[test]
4436 fn parse_claude_output_auth_error() {
4437 let output = r#"[
4438 {"type":"system","subtype":"init"},
4439 {"type":"result","is_error":true,"error":"authentication failed"}
4440 ]"#;
4441 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4442 assert!(format!("{err}").contains("authentication failed"));
4443 }
4444
4445 #[cfg(unix)]
4446 #[test]
4447 fn call_codex_returns_raw_json_for_body_enrich_schema() {
4448 let tmp = tempfile::tempdir().expect("tempdir");
4449 let binary = tmp.path().join("codex-mock");
4450 std::fs::write(
4451 &binary,
4452 r#"#!/usr/bin/env bash
4453set -euo pipefail
4454cat <<'JSONL'
4455{"type":"thread.started","thread_id":"mock-thread-0"}
4456{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4457{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4458JSONL
4459"#,
4460 )
4461 .expect("mock codex write");
4462 let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4463 perms.set_mode(0o755);
4464 std::fs::set_permissions(&binary, perms).expect("chmod");
4465
4466 let (value, cost, is_oauth) =
4467 call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4468 .expect("call_codex must accept body-enrich payload");
4469
4470 assert_eq!(value["enriched_body"], "expanded body");
4471 assert_eq!(cost, 0.0);
4472 assert!(!is_oauth);
4473 }
4474
4475 #[test]
4476 fn dry_run_emits_preview_without_calling_llm() {
4477 let conn = open_test_db();
4482 conn.execute(
4483 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4484 [],
4485 )
4486 .unwrap();
4487
4488 let results = scan_short_body_memories(&conn, "global", 1000, None, &[]).unwrap();
4489 assert_eq!(results.len(), 1);
4490 assert_eq!(results[0].1, "dry-mem");
4491 }
4494
4495 #[test]
4496 fn persist_entity_description_updates_db() {
4497 let conn = open_test_db();
4498 conn.execute(
4499 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4500 [],
4501 )
4502 .unwrap();
4503 let eid: i64 = conn
4504 .query_row(
4505 "SELECT id FROM entities WHERE name='tokio-runtime'",
4506 [],
4507 |r| r.get(0),
4508 )
4509 .unwrap();
4510
4511 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
4512
4513 let desc: String = conn
4514 .query_row(
4515 "SELECT description FROM entities WHERE id=?1",
4516 rusqlite::params![eid],
4517 |r| r.get(0),
4518 )
4519 .unwrap();
4520 assert_eq!(desc, "Async runtime for Rust applications");
4521 }
4522
4523 #[test]
4524 fn bindings_schema_is_valid_json() {
4525 let _: serde_json::Value =
4526 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
4527 }
4528
4529 #[test]
4530 fn entity_description_schema_is_valid_json() {
4531 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
4532 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
4533 }
4534
4535 #[test]
4536 fn body_enrich_schema_is_valid_json() {
4537 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
4538 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
4539 }
4540}