1use crate::commands::ingest_claude::find_claude_binary;
25use crate::constants::MAX_MEMORY_BODY_LEN;
26use crate::entity_type::EntityType;
27use crate::errors::AppError;
28use crate::paths::AppPaths;
29use crate::storage::connection::{ensure_db_ready, open_rw};
30use crate::storage::entities::{self, NewEntity, NewRelationship};
31use crate::storage::memories;
32
33use rusqlite::Connection;
34use serde::{Deserialize, Serialize};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::time::Instant;
38
39const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
44const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
45const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
46const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
47
48const BINDINGS_SCHEMA: &str = r#"{
53 "type": "object",
54 "properties": {
55 "entities": {
56 "type": "array",
57 "items": {
58 "type": "object",
59 "properties": {
60 "name": { "type": "string" },
61 "entity_type": {
62 "type": "string",
63 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
64 }
65 },
66 "required": ["name", "entity_type"],
67 "additionalProperties": false
68 }
69 },
70 "relationships": {
71 "type": "array",
72 "items": {
73 "type": "object",
74 "properties": {
75 "source": { "type": "string" },
76 "target": { "type": "string" },
77 "relation": {
78 "type": "string",
79 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
80 },
81 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
82 },
83 "required": ["source","target","relation","strength"],
84 "additionalProperties": false
85 }
86 }
87 },
88 "required": ["entities","relationships"],
89 "additionalProperties": false
90}"#;
91
92const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
93 "type": "object",
94 "properties": {
95 "description": { "type": "string" }
96 },
97 "required": ["description"],
98 "additionalProperties": false
99}"#;
100
101const BODY_ENRICH_SCHEMA: &str = r#"{
102 "type": "object",
103 "properties": {
104 "enriched_body": { "type": "string" }
105 },
106 "required": ["enriched_body"],
107 "additionalProperties": false
108}"#;
109
110const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
112Scale:\n\
113- 0.9 = vital hard dependency (A cannot function without B)\n\
114- 0.7 = important design relationship (A strongly supports/enables B)\n\
115- 0.5 = useful contextual link (A and B share relevant context)\n\
116- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
117Respond with the calibrated weight and brief reasoning.";
118
119const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
120 "type": "object",
121 "properties": {
122 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
123 "reasoning": { "type": "string" }
124 },
125 "required": ["calibrated_weight", "reasoning"],
126 "additionalProperties": false
127}"#;
128
129const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
131Valid canonical relations (pick exactly one):\n\
132- depends-on: A cannot function without B\n\
133- uses: A utilizes B but could substitute it\n\
134- supports: A reinforces or enables B\n\
135- causes: A triggers or produces B\n\
136- fixes: A resolves a problem in B\n\
137- contradicts: A conflicts with or invalidates B\n\
138- applies-to: A is relevant to or scoped within B\n\
139- follows: A comes after B in sequence\n\
140- replaces: A substitutes B\n\
141- tracked-in: A is monitored in B\n\
142- related: A and B share context (use sparingly)\n\n\
143Respond with the correct relation, strength, and reasoning.";
144
145const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
146 "type": "object",
147 "properties": {
148 "relation": { "type": "string" },
149 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
150 "reasoning": { "type": "string" }
151 },
152 "required": ["relation", "strength", "reasoning"],
153 "additionalProperties": false
154}"#;
155
156const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
158Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
159If NO meaningful relationship exists, set relation to \"none\".\n\
160Respond with the relation (or \"none\"), strength, and reasoning.";
161
162const ENTITY_CONNECT_SCHEMA: &str = r#"{
163 "type": "object",
164 "properties": {
165 "relation": { "type": "string" },
166 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
167 "reasoning": { "type": "string" }
168 },
169 "required": ["relation", "strength", "reasoning"],
170 "additionalProperties": false
171}"#;
172
173const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
175Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
176If the current type is correct, keep it. If wrong, suggest the correct type.\n\
177Respond with the validated type and reasoning.";
178
179const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
180 "type": "object",
181 "properties": {
182 "validated_type": { "type": "string" },
183 "was_correct": { "type": "boolean" },
184 "reasoning": { "type": "string" }
185 },
186 "required": ["validated_type", "was_correct", "reasoning"],
187 "additionalProperties": false
188}"#;
189
190const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
192BAD: 'ingested from docs/auth.md'\n\
193GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
194Respond with the improved description and reasoning.";
195
196const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
197 "type": "object",
198 "properties": {
199 "description": { "type": "string" },
200 "reasoning": { "type": "string" }
201 },
202 "required": ["description", "reasoning"],
203 "additionalProperties": false
204}"#;
205
206const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
208Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
209
210const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
211 "type": "object",
212 "properties": {
213 "domain": { "type": "string" },
214 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
215 "reasoning": { "type": "string" }
216 },
217 "required": ["domain", "confidence", "reasoning"],
218 "additionalProperties": false
219}"#;
220
221const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
223Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
224Respond with a list of issues found (or empty if none) and an overall quality score.";
225
226const GRAPH_AUDIT_SCHEMA: &str = r#"{
227 "type": "object",
228 "properties": {
229 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
230 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
231 "reasoning": { "type": "string" }
232 },
233 "required": ["quality_score", "issues", "reasoning"],
234 "additionalProperties": false
235}"#;
236
237const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
239Entity names: lowercase kebab-case, domain-specific.\n\
240Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
241Respond with extracted entities, relationships, and a synthesis summary.";
242
243const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
244 "type": "object",
245 "properties": {
246 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
247 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
248 "summary": { "type": "string" }
249 },
250 "required": ["entities", "relationships", "summary"],
251 "additionalProperties": false
252}"#;
253
254const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
256Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
257Respond with the restructured body and a brief summary of changes.";
258
259const BODY_EXTRACT_SCHEMA: &str = r#"{
260 "type": "object",
261 "properties": {
262 "restructured_body": { "type": "string" },
263 "changes_summary": { "type": "string" }
264 },
265 "required": ["restructured_body", "changes_summary"],
266 "additionalProperties": false
267}"#;
268
269const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2741. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2752. Typed relationships between entities with strength scores\n\n\
276Rules:\n\
277- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
278- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
279- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
280- NEVER use 'mentions' as relationship type\n\
281- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
282- Prefer fewer high-quality entities over many low-quality ones";
283
284const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
285
286const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
287
288#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
294#[serde(rename_all = "kebab-case")]
295pub enum EnrichOperation {
296 MemoryBindings,
298 EntityDescriptions,
300 BodyEnrich,
302 WeightCalibrate,
304 RelationReclassify,
306 EntityConnect,
308 EntityTypeValidate,
310 DescriptionEnrich,
312 CrossDomainBridges,
314 DomainClassify,
316 GraphAudit,
318 DeepResearchSynth,
320 BodyExtract,
322}
323
324#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
326pub enum EnrichMode {
327 ClaudeCode,
329 Codex,
331}
332
333impl std::fmt::Display for EnrichMode {
334 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
335 match self {
336 EnrichMode::ClaudeCode => write!(f, "claude-code"),
337 EnrichMode::Codex => write!(f, "codex"),
338 }
339 }
340}
341
342#[derive(clap::Args)]
344#[command(
345 about = "Enrich graph memories and entities using an LLM provider",
346 after_long_help = "EXAMPLES:\n \
347 # Add missing entity bindings to all unbound memories\n \
348 sqlite-graphrag enrich --operation memory-bindings --mode claude-code\n\n \
349 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
350 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
351 # Expand short memory bodies (GAP-18)\n \
352 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
353 # Resume an interrupted body-enrich run\n \
354 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
355 # Retry only failed items from a previous run\n \
356 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
357 EXIT CODES:\n \
358 0 success\n \
359 1 validation error (bad args, binary not found)\n \
360 14 I/O error"
361)]
362pub struct EnrichArgs {
363 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
365 pub operation: EnrichOperation,
366
367 #[arg(long, value_enum, default_value = "claude-code")]
369 pub mode: EnrichMode,
370
371 #[arg(long, value_name = "N")]
373 pub limit: Option<usize>,
374
375 #[arg(long)]
377 pub dry_run: bool,
378
379 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
381 pub namespace: Option<String>,
382
383 #[arg(long, value_name = "PATH")]
386 pub claude_binary: Option<PathBuf>,
387
388 #[arg(long, value_name = "MODEL")]
390 pub claude_model: Option<String>,
391
392 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
394 pub claude_timeout: u64,
395
396 #[arg(long, value_name = "PATH")]
399 pub codex_binary: Option<PathBuf>,
400
401 #[arg(long, value_name = "MODEL")]
403 pub codex_model: Option<String>,
404
405 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
407 pub codex_timeout: u64,
408
409 #[arg(long, value_name = "USD")]
412 pub max_cost_usd: Option<f64>,
413
414 #[arg(long)]
417 pub resume: bool,
418
419 #[arg(long)]
421 pub retry_failed: bool,
422
423 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
426 pub min_output_chars: usize,
427
428 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
430 pub max_output_chars: usize,
431
432 #[arg(long, default_value_t = true)]
434 pub preserve_check: bool,
435
436 #[arg(long, value_name = "PATH")]
438 pub prompt_template: Option<PathBuf>,
439
440 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
444 pub llm_parallelism: u32,
445
446 #[arg(long)]
449 pub json: bool,
450
451 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
453 pub db: Option<String>,
454
455 #[arg(long, value_name = "SECONDS")]
458 pub wait_job_singleton: Option<u64>,
459
460 #[arg(long, default_value_t = false)]
464 pub force_job_singleton: bool,
465
466 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
470 pub names: Vec<String>,
471
472 #[arg(long, value_name = "PATH")]
476 pub names_file: Option<PathBuf>,
477
478 #[arg(long, default_value_t = false)]
482 pub preflight_check: bool,
483
484 #[arg(long, value_enum)]
488 pub fallback_mode: Option<EnrichMode>,
489
490 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
493 pub rate_limit_buffer: u64,
494
495 #[arg(long, default_value_t = true)]
499 pub max_load_check: bool,
500
501 #[arg(long, value_name = "N", default_value_t = 5)]
504 pub circuit_breaker_threshold: u32,
505
506 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
513 pub preserve_threshold: f64,
514
515 #[arg(long, default_value_t = true)]
520 pub codex_model_validate: bool,
521
522 #[arg(long, value_name = "MODEL")]
527 pub codex_model_fallback: Option<String>,
528}
529
530#[derive(Debug, Serialize)]
539struct PhaseEvent<'a> {
540 phase: &'a str,
541 #[serde(skip_serializing_if = "Option::is_none")]
542 binary_path: Option<&'a str>,
543 #[serde(skip_serializing_if = "Option::is_none")]
544 version: Option<&'a str>,
545 #[serde(skip_serializing_if = "Option::is_none")]
546 items_total: Option<usize>,
547 #[serde(skip_serializing_if = "Option::is_none")]
548 items_pending: Option<usize>,
549 #[serde(skip_serializing_if = "Option::is_none")]
551 llm_parallelism: Option<u32>,
552}
553
554#[derive(Debug, Serialize)]
555struct ItemEvent<'a> {
556 item: &'a str,
558 status: &'a str,
559 #[serde(skip_serializing_if = "Option::is_none")]
560 memory_id: Option<i64>,
561 #[serde(skip_serializing_if = "Option::is_none")]
562 entity_id: Option<i64>,
563 #[serde(skip_serializing_if = "Option::is_none")]
564 entities: Option<usize>,
565 #[serde(skip_serializing_if = "Option::is_none")]
566 rels: Option<usize>,
567 #[serde(skip_serializing_if = "Option::is_none")]
568 chars_before: Option<usize>,
569 #[serde(skip_serializing_if = "Option::is_none")]
570 chars_after: Option<usize>,
571 #[serde(skip_serializing_if = "Option::is_none")]
572 cost_usd: Option<f64>,
573 #[serde(skip_serializing_if = "Option::is_none")]
574 elapsed_ms: Option<u64>,
575 #[serde(skip_serializing_if = "Option::is_none")]
576 error: Option<String>,
577 index: usize,
578 total: usize,
579}
580
581#[derive(Debug, Serialize)]
582struct EnrichSummary {
583 summary: bool,
584 operation: String,
585 items_total: usize,
586 completed: usize,
587 failed: usize,
588 skipped: usize,
589 cost_usd: f64,
590 elapsed_ms: u64,
591}
592
593use crate::output::emit_json_line as emit_json;
594
595fn open_queue_db(path: &str) -> Result<Connection, AppError> {
610 let conn = Connection::open(path)?;
611 conn.pragma_update(None, "journal_mode", "wal")?;
612 conn.execute_batch(
613 "CREATE TABLE IF NOT EXISTS queue (
614 id INTEGER PRIMARY KEY AUTOINCREMENT,
615 item_key TEXT NOT NULL UNIQUE,
616 item_type TEXT NOT NULL DEFAULT 'memory',
617 status TEXT NOT NULL DEFAULT 'pending',
618 memory_id INTEGER,
619 entity_id INTEGER,
620 entities INTEGER DEFAULT 0,
621 rels INTEGER DEFAULT 0,
622 error TEXT,
623 cost_usd REAL DEFAULT 0.0,
624 attempt INTEGER DEFAULT 0,
625 elapsed_ms INTEGER,
626 created_at TEXT DEFAULT (datetime('now')),
627 done_at TEXT
628 );
629 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
630 )?;
631 Ok(conn)
632}
633
634fn call_claude(
642 binary: &Path,
643 prompt: &str,
644 json_schema: &str,
645 input_text: &str,
646 model: Option<&str>,
647 timeout_secs: u64,
648) -> Result<(serde_json::Value, f64, bool), AppError> {
649 let result = crate::commands::claude_runner::run_claude(
650 binary,
651 prompt,
652 json_schema,
653 input_text,
654 model,
655 timeout_secs,
656 7,
657 )?;
658 Ok((result.value, result.cost_usd, result.is_oauth))
659}
660
661enum PreflightOutcome {
667 Healthy,
669 RateLimited {
673 reason: String,
674 suggestion: &'static str,
675 },
676 Error(AppError),
678}
679
680fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
688 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
689
690 match args.mode {
691 EnrichMode::ClaudeCode => {
692 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
693 Ok(b) => b,
694 Err(e) => return PreflightOutcome::Error(e),
695 };
696 let mut cmd = std::process::Command::new(&bin);
697 cmd.env_clear();
698 for var in &["PATH", "HOME", "USER"] {
699 if let Ok(val) = std::env::var(var) {
700 cmd.env(var, val);
701 }
702 }
703 cmd.arg("-p")
704 .arg("ping")
705 .arg("--max-turns")
706 .arg("1")
707 .arg("--strict-mcp-config")
708 .arg("--mcp-config")
709 .arg("{}")
710 .arg("--dangerously-skip-permissions")
711 .arg("--settings")
712 .arg("{\"hooks\":{}}")
713 .arg("--output-format")
714 .arg("json")
715 .stdin(std::process::Stdio::null())
716 .stdout(std::process::Stdio::piped())
717 .stderr(std::process::Stdio::piped());
718
719 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
720 Ok(c) => c,
721 Err(e) => {
722 return PreflightOutcome::Error(AppError::Io(e));
723 }
724 };
725 let output = match wait_with_timeout(child, timeout) {
726 Ok(out) => out,
727 Err(e) => return PreflightOutcome::Error(e),
728 };
729 if !output.status.success() {
730 let stderr = String::from_utf8_lossy(&output.stderr);
731 if stderr.contains("hit your session limit")
732 || stderr.contains("rate_limit")
733 || stderr.contains("429")
734 {
735 return PreflightOutcome::RateLimited {
736 reason: stderr.trim().to_string(),
737 suggestion:
738 "wait for the OAuth window to reset or use --fallback-mode codex",
739 };
740 }
741 return PreflightOutcome::Error(AppError::Validation(format!(
742 "preflight probe failed: {stderr}",
743 stderr = stderr.trim()
744 )));
745 }
746 PreflightOutcome::Healthy
747 }
748 EnrichMode::Codex => {
749 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
750 Ok(b) => b,
751 Err(e) => return PreflightOutcome::Error(e),
752 };
753 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
754 .map_err(PreflightOutcome::Error)
755 .ok();
756 let schema = "{}";
757 let schema_path = match super::codex_spawn::trusted_schema_path() {
758 Ok(p) => p,
759 Err(e) => return PreflightOutcome::Error(e),
760 };
761 let spawn_args = super::codex_spawn::CodexSpawnArgs {
762 binary: &bin,
763 prompt: "ping",
764 json_schema: schema,
765 input_text: "",
766 model: args.codex_model.as_deref(),
767 timeout_secs: args.rate_limit_buffer.max(60),
768 schema_path: schema_path.clone(),
769 };
770 let mut cmd = super::codex_spawn::build_codex_command(&spawn_args);
771 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
772 Ok(c) => c,
773 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
774 };
775 let output = match wait_with_timeout(child, timeout) {
776 Ok(out) => out,
777 Err(e) => return PreflightOutcome::Error(e),
778 };
779 let _ = std::fs::remove_file(&schema_path);
780 if !output.status.success() {
781 let stderr = String::from_utf8_lossy(&output.stderr);
782 if stderr.contains("rate_limit")
783 || stderr.contains("429")
784 || stderr.contains("Too Many Requests")
785 {
786 return PreflightOutcome::RateLimited {
787 reason: stderr.trim().to_string(),
788 suggestion: "wait for the rate-limit window to reset",
789 };
790 }
791 return PreflightOutcome::Error(AppError::Validation(format!(
792 "preflight probe failed: {stderr}",
793 stderr = stderr.trim()
794 )));
795 }
796 PreflightOutcome::Healthy
797 }
798 }
799}
800
801fn wait_with_timeout(
803 mut child: std::process::Child,
804 timeout: std::time::Duration,
805) -> Result<std::process::Output, AppError> {
806 use wait_timeout::ChildExt;
807 let start = std::time::Instant::now();
808 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
809 if status.is_none() {
810 let _ = child.kill();
811 let _ = child.wait();
812 return Err(AppError::Validation(format!(
813 "preflight probe timed out after {}s",
814 start.elapsed().as_secs()
815 )));
816 }
817 let mut stdout = Vec::new();
818 if let Some(mut out) = child.stdout.take() {
819 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
820 }
821 let mut stderr = Vec::new();
822 if let Some(mut err) = child.stderr.take() {
823 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
824 }
825 let exit = status.unwrap();
826 Ok(std::process::Output {
827 status: exit,
828 stdout,
829 stderr,
830 })
831}
832
833fn scan_unbound_memories(
844 conn: &Connection,
845 namespace: &str,
846 limit: Option<usize>,
847 name_filter: &[String],
848) -> Result<Vec<(i64, String, String)>, AppError> {
849 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
850
851 if name_filter.is_empty() {
852 let sql = format!(
853 "SELECT m.id, m.name, m.body
854 FROM memories m
855 WHERE m.namespace = ?1
856 AND m.deleted_at IS NULL
857 AND NOT EXISTS (
858 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
859 )
860 ORDER BY m.id
861 {limit_clause}"
862 );
863 let mut stmt = conn.prepare(&sql)?;
864 let rows = stmt
865 .query_map(rusqlite::params![namespace], |r| {
866 Ok((
867 r.get::<_, i64>(0)?,
868 r.get::<_, String>(1)?,
869 r.get::<_, String>(2)?,
870 ))
871 })?
872 .collect::<Result<Vec<_>, _>>()?;
873 Ok(rows)
874 } else {
875 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
877 .map(|i| format!("?{i}"))
878 .collect();
879 let in_clause = placeholders.join(", ");
880 let sql = format!(
881 "SELECT m.id, m.name, m.body
882 FROM memories m
883 WHERE m.namespace = ?1
884 AND m.deleted_at IS NULL
885 AND m.name IN ({in_clause})
886 AND NOT EXISTS (
887 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
888 )
889 ORDER BY m.id
890 {limit_clause}"
891 );
892 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
893 params_vec.push(&namespace);
894 for n in name_filter {
895 params_vec.push(n);
896 }
897 let mut stmt = conn.prepare(&sql)?;
898 let rows = stmt
899 .query_map(
900 rusqlite::params_from_iter(params_vec.iter().copied()),
901 |r| {
902 Ok((
903 r.get::<_, i64>(0)?,
904 r.get::<_, String>(1)?,
905 r.get::<_, String>(2)?,
906 ))
907 },
908 )?
909 .collect::<Result<Vec<_>, _>>()?;
910 Ok(rows)
911 }
912}
913
914fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
919 let content = std::fs::read_to_string(path).map_err(|e| {
920 AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
921 })?;
922 let mut seen = std::collections::HashSet::new();
923 let mut out = Vec::new();
924 for line in content.lines() {
925 let trimmed = line.trim();
926 if trimmed.is_empty() || trimmed.starts_with('#') {
927 continue;
928 }
929 if seen.insert(trimmed.to_string()) {
930 out.push(trimmed.to_string());
931 }
932 }
933 Ok(out)
934}
935
936fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
938 let mut combined: Vec<String> = args.names.clone();
939 if let Some(p) = &args.names_file {
940 let from_file = read_names_file(p)?;
941 for n in from_file {
942 if !combined.contains(&n) {
943 combined.push(n);
944 }
945 }
946 }
947 Ok(combined)
948}
949
950fn scan_entities_without_description(
954 conn: &Connection,
955 namespace: &str,
956 limit: Option<usize>,
957) -> Result<Vec<(i64, String, String)>, AppError> {
958 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
959 let sql = format!(
960 "SELECT id, name, type
961 FROM entities
962 WHERE namespace = ?1
963 AND (description IS NULL OR description = '')
964 ORDER BY id
965 {limit_clause}"
966 );
967 let mut stmt = conn.prepare(&sql)?;
968 let rows = stmt
969 .query_map(rusqlite::params![namespace], |r| {
970 Ok((
971 r.get::<_, i64>(0)?,
972 r.get::<_, String>(1)?,
973 r.get::<_, String>(2)?,
974 ))
975 })?
976 .collect::<Result<Vec<_>, _>>()?;
977 Ok(rows)
978}
979
980fn scan_short_body_memories(
984 conn: &Connection,
985 namespace: &str,
986 min_chars: usize,
987 limit: Option<usize>,
988) -> Result<Vec<(i64, String, String)>, AppError> {
989 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
990 let sql = format!(
991 "SELECT m.id, m.name, m.body
992 FROM memories m
993 WHERE m.namespace = ?1
994 AND m.deleted_at IS NULL
995 AND LENGTH(COALESCE(m.body,'')) < ?2
996 ORDER BY m.id
997 {limit_clause}"
998 );
999 let mut stmt = conn.prepare(&sql)?;
1000 let rows = stmt
1001 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1002 Ok((
1003 r.get::<_, i64>(0)?,
1004 r.get::<_, String>(1)?,
1005 r.get::<_, String>(2)?,
1006 ))
1007 })?
1008 .collect::<Result<Vec<_>, _>>()?;
1009 Ok(rows)
1010}
1011
1012#[allow(clippy::type_complexity)]
1014fn scan_weight_candidates(
1015 conn: &Connection,
1016 namespace: &str,
1017 limit: Option<usize>,
1018) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1019 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1020 let sql = format!(
1021 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1022 FROM relationships r \
1023 JOIN entities e1 ON e1.id = r.source_id \
1024 JOIN entities e2 ON e2.id = r.target_id \
1025 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1026 ORDER BY r.weight DESC {limit_clause}"
1027 );
1028 let mut stmt = conn.prepare(&sql)?;
1029 let rows = stmt
1030 .query_map(rusqlite::params![namespace], |r| {
1031 Ok((
1032 r.get::<_, i64>(0)?,
1033 r.get::<_, String>(1)?,
1034 r.get::<_, String>(2)?,
1035 r.get::<_, String>(3)?,
1036 r.get::<_, f64>(4)?,
1037 ))
1038 })?
1039 .collect::<Result<Vec<_>, _>>()?;
1040 Ok(rows)
1041}
1042
1043fn scan_generic_relations(
1045 conn: &Connection,
1046 namespace: &str,
1047 limit: Option<usize>,
1048) -> Result<Vec<(i64, String, String, String)>, AppError> {
1049 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1050 let sql = format!(
1051 "SELECT r.id, e1.name, e2.name, r.relation \
1052 FROM relationships r \
1053 JOIN entities e1 ON e1.id = r.source_id \
1054 JOIN entities e2 ON e2.id = r.target_id \
1055 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1056 ORDER BY r.id {limit_clause}"
1057 );
1058 let mut stmt = conn.prepare(&sql)?;
1059 let rows = stmt
1060 .query_map(rusqlite::params![namespace], |r| {
1061 Ok((
1062 r.get::<_, i64>(0)?,
1063 r.get::<_, String>(1)?,
1064 r.get::<_, String>(2)?,
1065 r.get::<_, String>(3)?,
1066 ))
1067 })?
1068 .collect::<Result<Vec<_>, _>>()?;
1069 Ok(rows)
1070}
1071
1072fn persist_memory_bindings(
1081 conn: &Connection,
1082 namespace: &str,
1083 memory_id: i64,
1084 entities_json: &serde_json::Value,
1085 rels_json: &serde_json::Value,
1086) -> Result<(usize, usize), AppError> {
1087 #[derive(Deserialize)]
1088 struct EntityItem {
1089 name: String,
1090 entity_type: String,
1091 }
1092 #[derive(Deserialize)]
1093 struct RelItem {
1094 source: String,
1095 target: String,
1096 relation: String,
1097 strength: f64,
1098 }
1099
1100 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1101 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1102
1103 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1104 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1105
1106 let mut ent_count = 0usize;
1107 let mut rel_count = 0usize;
1108
1109 for item in &extracted_entities {
1110 let entity_type = match item.entity_type.parse::<EntityType>() {
1111 Ok(et) => et,
1112 Err(_) => {
1113 tracing::warn!(
1114 target: "enrich",
1115 entity = %item.name,
1116 entity_type = %item.entity_type,
1117 "entity type not recognized, skipping"
1118 );
1119 continue;
1120 }
1121 };
1122 match entities::upsert_entity(
1123 conn,
1124 namespace,
1125 &NewEntity {
1126 name: item.name.clone(),
1127 entity_type,
1128 description: None,
1129 },
1130 ) {
1131 Ok(eid) => {
1132 let _ = entities::link_memory_entity(conn, memory_id, eid);
1133 ent_count += 1;
1134 }
1135 Err(e) => {
1136 tracing::warn!(
1137 target: "enrich",
1138 entity = %item.name,
1139 error = %e,
1140 "entity upsert skipped"
1141 );
1142 }
1143 }
1144 }
1145
1146 for rel in &extracted_rels {
1147 let normalized = crate::parsers::normalize_relation(&rel.relation);
1148 crate::parsers::warn_if_non_canonical(&normalized);
1149
1150 let src_name = crate::parsers::normalize_entity_name(&rel.source);
1153 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1154 let src_id = entities::find_entity_id(conn, namespace, &src_name);
1155 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1156 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1157 let new_rel = NewRelationship {
1158 source: rel.source.clone(),
1159 target: rel.target.clone(),
1160 relation: normalized,
1161 strength: rel.strength,
1162 description: None,
1163 };
1164 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1165 rel_count += 1;
1166 }
1167 }
1168 }
1169
1170 Ok((ent_count, rel_count))
1171}
1172
1173fn persist_entity_description(
1175 conn: &Connection,
1176 entity_id: i64,
1177 description: &str,
1178) -> Result<(), AppError> {
1179 conn.execute(
1180 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1181 rusqlite::params![description, entity_id],
1182 )?;
1183 Ok(())
1184}
1185
1186fn persist_enriched_body(
1191 conn: &Connection,
1192 namespace: &str,
1193 memory_id: i64,
1194 memory_name: &str,
1195 new_body: &str,
1196 paths: &crate::paths::AppPaths,
1197) -> Result<(), AppError> {
1198 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1200 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1201 rusqlite::params![memory_id],
1202 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1203 )?;
1204
1205 let memory_type: String = conn.query_row(
1206 "SELECT type FROM memories WHERE id=?1",
1207 rusqlite::params![memory_id],
1208 |r| r.get(0),
1209 )?;
1210
1211 let description: String = conn.query_row(
1212 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1213 rusqlite::params![memory_id],
1214 |r| r.get(0),
1215 )?;
1216
1217 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1218
1219 let new_memory = memories::NewMemory {
1220 namespace: namespace.to_string(),
1221 name: memory_name.to_string(),
1222 memory_type: memory_type.clone(),
1223 description: description.clone(),
1224 body: new_body.to_string(),
1225 body_hash,
1226 session_id: None,
1227 source: "agent".to_string(),
1228 metadata: serde_json::json!({
1229 "operation": "body-enrich",
1230 "orig_chars": old_body.chars().count(),
1231 "new_chars": new_body.chars().count(),
1232 }),
1233 };
1234
1235 let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1239 let version_metadata = serde_json::json!({
1240 "operation": "body-enrich",
1241 "orig_chars": old_body.chars().count(),
1242 "new_chars": new_body.chars().count(),
1243 })
1244 .to_string();
1245 crate::storage::versions::insert_version(
1246 conn,
1247 memory_id,
1248 next_version,
1249 memory_name,
1250 &memory_type,
1251 &description,
1252 new_body,
1253 &version_metadata,
1254 Some("enrich"),
1255 "edit",
1256 )?;
1257
1258 memories::update(conn, memory_id, &new_memory, None)?;
1259 memories::sync_fts_after_update(
1260 conn,
1261 memory_id,
1262 &old_name,
1263 &old_desc,
1264 &old_body,
1265 &new_memory.name,
1266 &new_memory.description,
1267 &new_memory.body,
1268 )?;
1269
1270 let snippet: String = new_body.chars().take(200).collect();
1272 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
1273 let chunks_info = crate::chunking::split_into_chunks_hierarchical(new_body, tokenizer);
1274 let embedding_result = if chunks_info.len() <= 1 {
1275 crate::daemon::embed_passage_or_local(&paths.models, new_body)
1276 } else {
1277 let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
1278 let mut ok = true;
1279 for chunk in &chunks_info {
1280 let text = crate::chunking::chunk_text(new_body, chunk);
1281 match crate::daemon::embed_passage_or_local(&paths.models, text) {
1282 Ok(emb) => chunk_embeddings.push(emb),
1283 Err(e) => {
1284 tracing::warn!(target: "enrich", error = %e, "chunk embedding failed");
1285 ok = false;
1286 break;
1287 }
1288 }
1289 }
1290 if ok {
1291 Ok(crate::chunking::aggregate_embeddings(&chunk_embeddings))
1292 } else {
1293 crate::daemon::embed_passage_or_local(&paths.models, new_body)
1294 }
1295 };
1296
1297 if let Ok(embedding) = embedding_result {
1298 if let Err(e) = memories::upsert_vec(
1299 conn,
1300 memory_id,
1301 namespace,
1302 &memory_type,
1303 &embedding,
1304 memory_name,
1305 &snippet,
1306 ) {
1307 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1308 }
1309 }
1310
1311 Ok(())
1312}
1313
1314pub fn run(args: &EnrichArgs) -> Result<(), AppError> {
1320 let started = Instant::now();
1328
1329 let paths = AppPaths::resolve(args.db.as_deref())?;
1330 ensure_db_ready(&paths)?;
1331 let conn = open_rw(&paths.db)?;
1332 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1333
1334 let wait_secs = args.wait_job_singleton;
1340 let force_flag = args.force_job_singleton;
1341 let _singleton = crate::lock::acquire_job_singleton(
1342 crate::lock::JobType::Enrich,
1343 &namespace,
1344 &paths.db,
1345 wait_secs,
1346 force_flag,
1347 )?;
1348
1349 let _effective_mode: EnrichMode = args.mode.clone();
1351 let provider_binary = match args.mode {
1352 EnrichMode::ClaudeCode => {
1353 let bin = find_claude_binary(args.claude_binary.as_deref())?;
1354 let version = super::claude_runner::validate_claude_version(&bin)?;
1355 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
1356 emit_json(&PhaseEvent {
1357 phase: "validate",
1358 binary_path: bin.to_str(),
1359 version: Some(&version),
1360 items_total: None,
1361 items_pending: None,
1362 llm_parallelism: None,
1363 });
1364 bin
1365 }
1366 EnrichMode::Codex => {
1367 let bin = find_codex_binary(args.codex_binary.as_deref())?;
1369 emit_json(&PhaseEvent {
1370 phase: "validate",
1371 binary_path: bin.to_str(),
1372 version: None,
1373 items_total: None,
1374 items_pending: None,
1375 llm_parallelism: None,
1376 });
1377 bin
1378 }
1379 };
1380
1381 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
1385 let load = crate::system_load::load_average_one();
1386 let n = crate::system_load::ncpus();
1387 return Err(AppError::Validation(format!(
1388 "system load average {load:.2} exceeds 2x ncpus ({n}); \
1389 pass --no-max-load-check to override (not recommended)"
1390 )));
1391 }
1392
1393 if args.preflight_check && !args.dry_run {
1400 let preflight_result = run_preflight_probe(args);
1401 match preflight_result {
1402 PreflightOutcome::Healthy => {
1403 tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
1404 }
1405 PreflightOutcome::RateLimited { reason, suggestion } => {
1406 if let Some(fallback) = args.fallback_mode.clone() {
1407 if fallback != args.mode {
1408 return Err(AppError::Validation(format!(
1418 "preflight detected rate limit on {mode:?}: {reason}; \
1419 re-invoke with `--mode {fallback:?}` to use the fallback provider",
1420 mode = args.mode
1421 )));
1422 }
1423 return Err(AppError::Validation(format!(
1424 "preflight detected rate limit on {mode:?}: {reason}; \
1425 --fallback-mode matches --mode, no recovery possible",
1426 mode = args.mode
1427 )));
1428 }
1429 return Err(AppError::Validation(format!(
1430 "preflight detected rate limit on {mode:?}: {reason}; \
1431 {suggestion}; pass --fallback-mode codex to recover",
1432 mode = args.mode
1433 )));
1434 }
1435 PreflightOutcome::Error(e) => {
1436 return Err(e);
1437 }
1438 }
1439 }
1440
1441 let scan_result = scan_operation(&conn, &namespace, args)?;
1443 let total = scan_result.len();
1444
1445 emit_json(&PhaseEvent {
1446 phase: "scan",
1447 binary_path: None,
1448 version: None,
1449 items_total: Some(total),
1450 items_pending: Some(total),
1451 llm_parallelism: Some(args.llm_parallelism),
1452 });
1453
1454 if args.dry_run {
1456 for (idx, key) in scan_result.iter().enumerate() {
1457 emit_json(&ItemEvent {
1458 item: key,
1459 status: "preview",
1460 memory_id: None,
1461 entity_id: None,
1462 entities: None,
1463 rels: None,
1464 chars_before: None,
1465 chars_after: None,
1466 cost_usd: None,
1467 elapsed_ms: None,
1468 error: None,
1469 index: idx,
1470 total,
1471 });
1472 }
1473 emit_json(&EnrichSummary {
1474 summary: true,
1475 operation: format!("{:?}", args.operation),
1476 items_total: total,
1477 completed: 0,
1478 failed: 0,
1479 skipped: 0,
1480 cost_usd: 0.0,
1481 elapsed_ms: started.elapsed().as_millis() as u64,
1482 });
1483 return Ok(());
1484 }
1485
1486 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1490
1491 if args.resume {
1492 let reset = queue_conn
1493 .execute(
1494 "UPDATE queue SET status='pending' WHERE status='processing'",
1495 [],
1496 )
1497 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
1498 if reset > 0 {
1499 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
1500 }
1501 }
1502
1503 if args.retry_failed {
1504 let count = queue_conn
1505 .execute(
1506 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
1507 [],
1508 )
1509 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
1510 tracing::info!(target: "enrich", count, "retrying failed items");
1511 }
1512
1513 if !args.resume && !args.retry_failed {
1514 queue_conn
1515 .execute("DELETE FROM queue", [])
1516 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
1517 }
1518
1519 for (idx, key) in scan_result.iter().enumerate() {
1521 let item_type = match args.operation {
1522 EnrichOperation::EntityDescriptions => "entity",
1523 _ => "memory",
1524 };
1525 if let Err(e) = queue_conn.execute(
1526 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
1527 rusqlite::params![key, item_type],
1528 ) {
1529 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
1530 }
1531 let _ = idx; }
1533
1534 let parallelism = args.llm_parallelism.clamp(1, 32) as usize;
1537 if parallelism > 1 {
1538 tracing::info!(
1539 target: "enrich",
1540 llm_parallelism = parallelism,
1541 "parallel LLM processing with bounded thread pool"
1542 );
1543 }
1544 if parallelism > 4 {
1548 match args.mode {
1549 EnrichMode::ClaudeCode => {
1550 tracing::warn!(
1551 target: "enrich",
1552 llm_parallelism = parallelism,
1553 recommended_max = 4,
1554 mode = "claude-code",
1555 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
1556 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
1557 to cut MCP children (G28-A)"
1558 );
1559 }
1560 EnrichMode::Codex if parallelism > 16 => {
1561 tracing::warn!(
1562 target: "enrich",
1563 llm_parallelism = parallelism,
1564 recommended_max = 16,
1565 mode = "codex",
1566 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
1567 consider --llm-parallelism 8 for safer concurrency"
1568 );
1569 }
1570 EnrichMode::Codex => {
1571 }
1575 }
1576 }
1577
1578 let mut completed = 0usize;
1579 let mut failed = 0usize;
1580 let mut skipped = 0usize;
1581 let mut cost_total = 0.0f64;
1582 let mut oauth_detected = false;
1583 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1584 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1585 let enrich_started = std::time::Instant::now();
1586
1587 let provider_timeout = match args.mode {
1588 EnrichMode::ClaudeCode => args.claude_timeout,
1589 EnrichMode::Codex => args.codex_timeout,
1590 };
1591
1592 let provider_model: Option<&str> = match args.mode {
1593 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
1594 EnrichMode::Codex => args.codex_model.as_deref(),
1595 };
1596
1597 if parallelism > 1 {
1601 let stdout_mu = parking_lot::Mutex::new(());
1602 let budget = args.max_cost_usd;
1603 let operation = args.operation.clone();
1604 let mode = args.mode.clone();
1605 let min_oc = args.min_output_chars;
1606 let max_oc = args.max_output_chars;
1607 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
1608
1609 struct WorkerResult {
1610 completed: usize,
1611 failed: usize,
1612 skipped: usize,
1613 cost: f64,
1614 oauth: bool,
1615 }
1616
1617 let results: Vec<WorkerResult> = std::thread::scope(|s| {
1618 let handles: Vec<_> = (0..parallelism)
1619 .map(|worker_id| {
1620 let stdout_mu = &stdout_mu;
1621 let paths = &paths;
1622 let namespace = &namespace;
1623 let provider_binary = &provider_binary;
1624 let operation = &operation;
1625 let mode = &mode;
1626 let prompt_tpl = prompt_tpl.as_deref();
1627 s.spawn(move || {
1628 let w_conn = match open_rw(&paths.db) {
1629 Ok(c) => c,
1630 Err(e) => {
1631 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
1632 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1633 }
1634 };
1635 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
1636 Ok(c) => c,
1637 Err(e) => {
1638 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
1639 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
1640 }
1641 };
1642 let mut w_completed = 0usize;
1643 let mut w_failed = 0usize;
1644 let mut w_skipped = 0usize;
1645 let mut w_cost = 0.0f64;
1646 let mut w_oauth = false;
1647 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1648 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
1649 let mut w_breaker = crate::retry::CircuitBreaker::new(
1655 args.circuit_breaker_threshold.max(1),
1656 std::time::Duration::from_secs(60),
1657 );
1658
1659 loop {
1660 if crate::shutdown_requested() {
1661 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
1662 break;
1663 }
1664 if let Some(b) = budget {
1665 if !w_oauth && w_cost >= b {
1666 break;
1667 }
1668 }
1669 let pending: Option<(i64, String, String)> = w_queue
1670 .query_row(
1671 "UPDATE queue SET status='processing', attempt=attempt+1 \
1672 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1673 RETURNING id, item_key, item_type",
1674 [],
1675 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1676 )
1677 .ok();
1678 let (queue_id, item_key, _item_type) = match pending {
1679 Some(p) => p,
1680 None => break,
1681 };
1682 let item_started = Instant::now();
1683 let current_index = w_completed + w_failed + w_skipped;
1684
1685 let call_result = match operation {
1686 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1687 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1688 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths),
1689 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1690 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1691 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1692 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1693 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1694 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1695 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1696 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1697 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary, provider_model, provider_timeout, mode),
1698 };
1699
1700 match call_result {
1701 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
1702 if is_oauth { w_oauth = true; }
1703 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
1704 let _ = w_queue.execute(
1705 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1706 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
1707 );
1708 w_completed += 1;
1709 if !is_oauth { w_cost += cost; }
1710 let _ = w_breaker
1712 .record(crate::retry::AttemptOutcome::Success);
1713 let _guard = stdout_mu.lock();
1714 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1715 }
1716 Ok(EnrichItemResult::Skipped { reason }) => {
1717 w_skipped += 1;
1718 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
1719 let _guard = stdout_mu.lock();
1720 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
1721 }
1722 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
1723 w_skipped += 1;
1729 let reason = format!(
1730 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
1731 );
1732 let _ = w_queue.execute(
1733 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
1734 rusqlite::params![reason, queue_id],
1735 );
1736 let _guard = stdout_mu.lock();
1737 emit_json(&ItemEvent {
1738 item: &item_key,
1739 status: "preservation_failed",
1740 memory_id: None,
1741 entity_id: None,
1742 entities: None,
1743 rels: None,
1744 chars_before: Some(chars_before),
1745 chars_after: Some(chars_after),
1746 cost_usd: None,
1747 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
1748 error: Some(reason),
1749 index: current_index,
1750 total,
1751 });
1752 }
1753 Err(e) => {
1754 let err_str = format!("{e}");
1755 if matches!(e, AppError::RateLimited { .. }) {
1756 if crate::retry::is_kill_switch_active() {
1757 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1758 } else if std::time::Instant::now() >= w_deadline {
1759 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
1760 } else {
1761 let half = w_backoff / 2;
1762 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1763 let actual_wait = half + jitter;
1764 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
1765 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
1766 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1767 w_backoff = (w_backoff * 2).min(900);
1768 continue;
1769 }
1770 }
1771 w_failed += 1;
1772 let _ = w_queue.execute("UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![err_str, queue_id]);
1773 let _guard = stdout_mu.lock();
1774 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
1775 let breaker_opened = w_breaker
1777 .record(crate::retry::AttemptOutcome::HardFailure);
1778 if breaker_opened {
1779 tracing::error!(target: "enrich",
1780 consecutive_failures = w_breaker.consecutive_failures(),
1781 "circuit breaker opened — aborting worker"
1782 );
1783 break;
1784 }
1785 }
1786 }
1787 }
1788 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
1789 })
1790 })
1791 .collect();
1792 handles
1793 .into_iter()
1794 .map(|h| {
1795 h.join().unwrap_or(WorkerResult {
1796 completed: 0,
1797 failed: 0,
1798 skipped: 0,
1799 cost: 0.0,
1800 oauth: false,
1801 })
1802 })
1803 .collect()
1804 });
1805
1806 for r in &results {
1807 completed += r.completed;
1808 failed += r.failed;
1809 skipped += r.skipped;
1810 cost_total += r.cost;
1811 oauth_detected |= r.oauth;
1812 }
1813 } else {
1814 loop {
1816 if crate::shutdown_requested() {
1817 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
1818 break;
1819 }
1820
1821 if let Some(budget) = args.max_cost_usd {
1823 if !oauth_detected && cost_total >= budget {
1824 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
1825 break;
1826 }
1827 }
1828
1829 let pending: Option<(i64, String, String)> = queue_conn
1831 .query_row(
1832 "UPDATE queue SET status='processing', attempt=attempt+1 \
1833 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
1834 RETURNING id, item_key, item_type",
1835 [],
1836 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1837 )
1838 .ok();
1839
1840 let (queue_id, item_key, item_type) = match pending {
1841 Some(p) => p,
1842 None => break,
1843 };
1844
1845 let item_started = Instant::now();
1846 let current_index = completed + failed + skipped;
1847
1848 let call_result = match args.operation {
1849 EnrichOperation::MemoryBindings => call_memory_bindings(
1850 &conn,
1851 &namespace,
1852 &item_key,
1853 &provider_binary,
1854 provider_model,
1855 provider_timeout,
1856 &args.mode,
1857 ),
1858 EnrichOperation::EntityDescriptions => call_entity_description(
1859 &conn,
1860 &namespace,
1861 &item_key,
1862 &provider_binary,
1863 provider_model,
1864 provider_timeout,
1865 &args.mode,
1866 ),
1867 EnrichOperation::BodyEnrich => call_body_enrich(
1868 &conn,
1869 &namespace,
1870 &item_key,
1871 &provider_binary,
1872 provider_model,
1873 provider_timeout,
1874 &args.mode,
1875 args.min_output_chars,
1876 args.max_output_chars,
1877 args.prompt_template.as_deref(),
1878 args.preserve_threshold,
1879 &paths,
1880 ),
1881 EnrichOperation::WeightCalibrate => call_weight_calibrate(
1882 &conn,
1883 &namespace,
1884 &item_key,
1885 &provider_binary,
1886 provider_model,
1887 provider_timeout,
1888 &args.mode,
1889 ),
1890 EnrichOperation::RelationReclassify => call_relation_reclassify(
1891 &conn,
1892 &namespace,
1893 &item_key,
1894 &provider_binary,
1895 provider_model,
1896 provider_timeout,
1897 &args.mode,
1898 ),
1899 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
1900 call_entity_connect(
1901 &conn,
1902 &namespace,
1903 &item_key,
1904 &provider_binary,
1905 provider_model,
1906 provider_timeout,
1907 &args.mode,
1908 )
1909 }
1910 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
1911 &conn,
1912 &namespace,
1913 &item_key,
1914 &provider_binary,
1915 provider_model,
1916 provider_timeout,
1917 &args.mode,
1918 ),
1919 EnrichOperation::DescriptionEnrich => call_description_enrich(
1920 &conn,
1921 &namespace,
1922 &item_key,
1923 &provider_binary,
1924 provider_model,
1925 provider_timeout,
1926 &args.mode,
1927 ),
1928 EnrichOperation::DomainClassify => call_domain_classify(
1929 &conn,
1930 &namespace,
1931 &item_key,
1932 &provider_binary,
1933 provider_model,
1934 provider_timeout,
1935 &args.mode,
1936 ),
1937 EnrichOperation::GraphAudit => call_graph_audit(
1938 &conn,
1939 &namespace,
1940 &item_key,
1941 &provider_binary,
1942 provider_model,
1943 provider_timeout,
1944 &args.mode,
1945 ),
1946 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
1947 &conn,
1948 &namespace,
1949 &item_key,
1950 &provider_binary,
1951 provider_model,
1952 provider_timeout,
1953 &args.mode,
1954 ),
1955 EnrichOperation::BodyExtract => call_body_extract(
1956 &conn,
1957 &namespace,
1958 &item_key,
1959 &provider_binary,
1960 provider_model,
1961 provider_timeout,
1962 &args.mode,
1963 ),
1964 };
1965
1966 match call_result {
1967 Ok(EnrichItemResult::Done {
1968 memory_id,
1969 entity_id,
1970 entities,
1971 rels,
1972 chars_before,
1973 chars_after,
1974 cost,
1975 is_oauth,
1976 }) => {
1977 if is_oauth && !oauth_detected {
1978 oauth_detected = true;
1979 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
1980 }
1981 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
1982
1983 let persist_err: Option<String> = match args.operation {
1985 EnrichOperation::MemoryBindings => {
1986 None
1988 }
1989 EnrichOperation::EntityDescriptions => {
1990 None
1992 }
1993 EnrichOperation::BodyEnrich => {
1994 None
1996 }
1997 _ => {
1998 None
2000 }
2001 };
2002
2003 if let Err(e) = queue_conn.execute(
2004 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2005 rusqlite::params![
2006 memory_id,
2007 entity_id,
2008 entities as i64,
2009 rels as i64,
2010 cost,
2011 item_started.elapsed().as_millis() as i64,
2012 queue_id
2013 ],
2014 ) {
2015 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2016 }
2017
2018 if persist_err.is_none() {
2019 completed += 1;
2020 if !is_oauth {
2021 cost_total += cost;
2022 }
2023 emit_json(&ItemEvent {
2024 item: &item_key,
2025 status: "done",
2026 memory_id,
2027 entity_id,
2028 entities: Some(entities),
2029 rels: Some(rels),
2030 chars_before,
2031 chars_after,
2032 cost_usd: if is_oauth { None } else { Some(cost) },
2033 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2034 error: None,
2035 index: current_index,
2036 total,
2037 });
2038 } else {
2039 failed += 1;
2040 emit_json(&ItemEvent {
2041 item: &item_key,
2042 status: "failed",
2043 memory_id: None,
2044 entity_id: None,
2045 entities: None,
2046 rels: None,
2047 chars_before: None,
2048 chars_after: None,
2049 cost_usd: None,
2050 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2051 error: persist_err,
2052 index: current_index,
2053 total,
2054 });
2055 }
2056 }
2057 Ok(EnrichItemResult::Skipped { reason }) => {
2058 skipped += 1;
2059 if let Err(e) = queue_conn.execute(
2060 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2061 rusqlite::params![reason, queue_id],
2062 ) {
2063 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2064 }
2065 emit_json(&ItemEvent {
2066 item: &item_key,
2067 status: "skipped",
2068 memory_id: None,
2069 entity_id: None,
2070 entities: None,
2071 rels: None,
2072 chars_before: None,
2073 chars_after: None,
2074 cost_usd: None,
2075 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2076 error: None,
2077 index: current_index,
2078 total,
2079 });
2080 }
2081 Ok(EnrichItemResult::PreservationFailed {
2082 score,
2083 threshold,
2084 chars_before,
2085 chars_after,
2086 }) => {
2087 skipped += 1;
2094 let reason = format!(
2095 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2096 );
2097 if let Err(qe) = queue_conn.execute(
2098 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2099 rusqlite::params![reason, queue_id],
2100 ) {
2101 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2102 }
2103 emit_json(&ItemEvent {
2104 item: &item_key,
2105 status: "preservation_failed",
2106 memory_id: None,
2107 entity_id: None,
2108 entities: None,
2109 rels: None,
2110 chars_before: Some(chars_before),
2111 chars_after: Some(chars_after),
2112 cost_usd: None,
2113 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2114 error: Some(reason),
2115 index: current_index,
2116 total,
2117 });
2118 }
2119 Err(e) => {
2120 let err_str = format!("{e}");
2121 if matches!(e, AppError::RateLimited { .. }) {
2122 if crate::retry::is_kill_switch_active() {
2123 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2124 } else if std::time::Instant::now() >= rate_limit_deadline {
2125 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2126 } else {
2127 let half = backoff_secs / 2;
2128 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2129 let actual_wait = half + jitter;
2130 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2131 if let Err(qe) = queue_conn.execute(
2132 "UPDATE queue SET status='pending' WHERE id=?1",
2133 rusqlite::params![queue_id],
2134 ) {
2135 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2136 }
2137 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2138 backoff_secs = (backoff_secs * 2).min(900);
2139 continue;
2140 }
2141 }
2142
2143 failed += 1;
2144 if let Err(qe) = queue_conn.execute(
2145 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
2146 rusqlite::params![err_str, queue_id],
2147 ) {
2148 tracing::warn!(target: "enrich", error = %qe, "queue failed update failed");
2149 }
2150 emit_json(&ItemEvent {
2151 item: &item_key,
2152 status: "failed",
2153 memory_id: None,
2154 entity_id: None,
2155 entities: None,
2156 rels: None,
2157 chars_before: None,
2158 chars_after: None,
2159 cost_usd: None,
2160 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2161 error: Some(err_str),
2162 index: current_index,
2163 total,
2164 });
2165 }
2166 }
2167
2168 let _ = item_type; }
2170 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2173 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2174
2175 emit_json(&EnrichSummary {
2176 summary: true,
2177 operation: format!("{:?}", args.operation),
2178 items_total: total,
2179 completed,
2180 failed,
2181 skipped,
2182 cost_usd: cost_total,
2183 elapsed_ms: started.elapsed().as_millis() as u64,
2184 });
2185
2186 if failed == 0 {
2187 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
2188 }
2189
2190 Ok(())
2191}
2192
2193enum EnrichItemResult {
2198 Done {
2199 memory_id: Option<i64>,
2200 entity_id: Option<i64>,
2201 entities: usize,
2202 rels: usize,
2203 chars_before: Option<usize>,
2204 chars_after: Option<usize>,
2205 cost: f64,
2206 is_oauth: bool,
2207 },
2208 Skipped {
2209 reason: String,
2210 },
2211 PreservationFailed {
2216 score: f64,
2217 threshold: f64,
2218 chars_before: usize,
2219 chars_after: usize,
2220 },
2221}
2222
2223fn call_memory_bindings(
2228 conn: &Connection,
2229 namespace: &str,
2230 memory_name: &str,
2231 binary: &Path,
2232 model: Option<&str>,
2233 timeout: u64,
2234 mode: &EnrichMode,
2235) -> Result<EnrichItemResult, AppError> {
2236 let (memory_id, body): (i64, String) = conn.query_row(
2238 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2239 rusqlite::params![namespace, memory_name],
2240 |r| Ok((r.get(0)?, r.get(1)?)),
2241 ).map_err(|e| match e {
2242 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
2243 other => AppError::Database(other),
2244 })?;
2245
2246 if body.trim().is_empty() {
2247 return Ok(EnrichItemResult::Skipped {
2248 reason: "body is empty".to_string(),
2249 });
2250 }
2251
2252 let (value, cost, is_oauth) = match mode {
2253 EnrichMode::ClaudeCode => call_claude(
2254 binary,
2255 BINDINGS_PROMPT,
2256 BINDINGS_SCHEMA,
2257 &body,
2258 model,
2259 timeout,
2260 )?,
2261 EnrichMode::Codex => call_codex(
2262 binary,
2263 BINDINGS_PROMPT,
2264 BINDINGS_SCHEMA,
2265 &body,
2266 model,
2267 timeout,
2268 )?,
2269 };
2270
2271 let empty_arr = serde_json::Value::Array(vec![]);
2272 let entities_val = value.get("entities").unwrap_or(&empty_arr);
2273 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
2274
2275 let (ent_count, rel_count) =
2276 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
2277
2278 Ok(EnrichItemResult::Done {
2279 memory_id: Some(memory_id),
2280 entity_id: None,
2281 entities: ent_count,
2282 rels: rel_count,
2283 chars_before: None,
2284 chars_after: None,
2285 cost,
2286 is_oauth,
2287 })
2288}
2289
2290fn call_entity_description(
2291 conn: &Connection,
2292 namespace: &str,
2293 entity_name: &str,
2294 binary: &Path,
2295 model: Option<&str>,
2296 timeout: u64,
2297 mode: &EnrichMode,
2298) -> Result<EnrichItemResult, AppError> {
2299 let (entity_id, entity_type): (i64, String) = conn
2300 .query_row(
2301 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
2302 rusqlite::params![namespace, entity_name],
2303 |r| Ok((r.get(0)?, r.get(1)?)),
2304 )
2305 .map_err(|e| match e {
2306 rusqlite::Error::QueryReturnedNoRows => {
2307 AppError::NotFound(format!("entity '{entity_name}' not found"))
2308 }
2309 other => AppError::Database(other),
2310 })?;
2311
2312 let prompt = format!(
2313 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
2314 );
2315
2316 let (value, cost, is_oauth) = match mode {
2317 EnrichMode::ClaudeCode => call_claude(
2318 binary,
2319 &prompt,
2320 ENTITY_DESCRIPTION_SCHEMA,
2321 "",
2322 model,
2323 timeout,
2324 )?,
2325 EnrichMode::Codex => call_codex(
2326 binary,
2327 &prompt,
2328 ENTITY_DESCRIPTION_SCHEMA,
2329 "",
2330 model,
2331 timeout,
2332 )?,
2333 };
2334
2335 let description = value
2336 .get("description")
2337 .and_then(|v| v.as_str())
2338 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
2339
2340 persist_entity_description(conn, entity_id, description)?;
2341
2342 Ok(EnrichItemResult::Done {
2343 memory_id: None,
2344 entity_id: Some(entity_id),
2345 entities: 0,
2346 rels: 0,
2347 chars_before: None,
2348 chars_after: None,
2349 cost,
2350 is_oauth,
2351 })
2352}
2353
2354#[allow(clippy::too_many_arguments)]
2355fn call_body_enrich(
2356 conn: &Connection,
2357 namespace: &str,
2358 memory_name: &str,
2359 binary: &Path,
2360 model: Option<&str>,
2361 timeout: u64,
2362 mode: &EnrichMode,
2363 min_output_chars: usize,
2364 max_output_chars: usize,
2365 prompt_template: Option<&Path>,
2366 preserve_threshold: f64,
2367 paths: &crate::paths::AppPaths,
2368) -> Result<EnrichItemResult, AppError> {
2369 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
2370 .query_row(
2371 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
2372 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
2373 rusqlite::params![namespace, memory_name],
2374 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2375 )
2376 .map_err(|e| match e {
2377 rusqlite::Error::QueryReturnedNoRows => {
2378 AppError::NotFound(format!("memory '{memory_name}' not found"))
2379 }
2380 other => AppError::Database(other),
2381 })?;
2382
2383 let chars_before = body.chars().count();
2384
2385 let linked_entities: Vec<String> = {
2387 let mut stmt = conn.prepare_cached(
2388 "SELECT e.name FROM memory_entities me \
2389 JOIN entities e ON e.id = me.entity_id \
2390 WHERE me.memory_id = ?1 LIMIT 10",
2391 )?;
2392 let result: Vec<String> = stmt
2393 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
2394 .filter_map(|r| r.ok())
2395 .collect();
2396 drop(stmt);
2397 result
2398 };
2399
2400 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
2402 let file_size = std::fs::metadata(tmpl_path)
2403 .map_err(|e| {
2404 AppError::Io(std::io::Error::new(
2405 e.kind(),
2406 format!("failed to stat prompt template: {e}"),
2407 ))
2408 })?
2409 .len();
2410 if file_size > MAX_MEMORY_BODY_LEN as u64 {
2411 return Err(AppError::LimitExceeded(
2412 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
2413 ));
2414 }
2415 std::fs::read_to_string(tmpl_path).map_err(|e| {
2416 AppError::Io(std::io::Error::new(
2417 e.kind(),
2418 format!("failed to read prompt template: {e}"),
2419 ))
2420 })?
2421 } else {
2422 BODY_ENRICH_PROMPT_PREFIX.to_string()
2423 };
2424
2425 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
2427 let mut ctx = String::new();
2428 ctx.push_str(&format!(
2429 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
2430 ));
2431 if !description.is_empty() {
2432 ctx.push_str(&format!("- Description: {description}\n"));
2433 }
2434 ctx.push_str(&format!("- Domain: {namespace}\n"));
2435 if !linked_entities.is_empty() {
2436 ctx.push_str(&format!(
2437 "- Linked entities: {}\n",
2438 linked_entities.join(", ")
2439 ));
2440 }
2441 ctx
2442 } else {
2443 String::new()
2444 };
2445
2446 let prompt = format!(
2447 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
2448 );
2449
2450 let (value, cost, is_oauth) = match mode {
2452 EnrichMode::ClaudeCode => {
2453 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2454 }
2455 EnrichMode::Codex => {
2456 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
2457 }
2458 };
2459
2460 let enriched_body = value
2461 .get("enriched_body")
2462 .and_then(|v| v.as_str())
2463 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
2464
2465 let chars_after = enriched_body.chars().count();
2466
2467 let threshold = preserve_threshold;
2474 let verdict =
2475 crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
2476 if !verdict.is_accepted() {
2477 return Ok(EnrichItemResult::PreservationFailed {
2478 score: match verdict {
2479 crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
2480 crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
2481 crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
2482 },
2483 threshold,
2484 chars_before,
2485 chars_after,
2486 });
2487 }
2488
2489 let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
2495 let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
2496 if old_hash == new_hash {
2497 return Ok(EnrichItemResult::Skipped {
2498 reason: format!(
2499 "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
2500 ),
2501 });
2502 }
2503
2504 if chars_after <= chars_before {
2506 return Ok(EnrichItemResult::Skipped {
2507 reason: format!(
2508 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
2509 ),
2510 });
2511 }
2512
2513 persist_enriched_body(
2514 conn,
2515 namespace,
2516 memory_id,
2517 memory_name,
2518 enriched_body,
2519 paths,
2520 )?;
2521
2522 Ok(EnrichItemResult::Done {
2523 memory_id: Some(memory_id),
2524 entity_id: None,
2525 entities: 0,
2526 rels: 0,
2527 chars_before: Some(chars_before),
2528 chars_after: Some(chars_after),
2529 cost,
2530 is_oauth,
2531 })
2532}
2533
2534fn scan_operation(
2539 conn: &Connection,
2540 namespace: &str,
2541 args: &EnrichArgs,
2542) -> Result<Vec<String>, AppError> {
2543 let name_filter = resolve_name_filter(args)?;
2545 match args.operation {
2546 EnrichOperation::MemoryBindings => {
2547 let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
2548 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2549 }
2550 EnrichOperation::EntityDescriptions => {
2551 let rows = scan_entities_without_description(conn, namespace, args.limit)?;
2552 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2553 }
2554 EnrichOperation::BodyEnrich => {
2555 let rows =
2556 scan_short_body_memories(conn, namespace, args.min_output_chars, args.limit)?;
2557 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2558 }
2559 EnrichOperation::WeightCalibrate => {
2560 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
2561 Ok(rows
2562 .into_iter()
2563 .map(|(id, _, _, _, _)| id.to_string())
2564 .collect())
2565 }
2566 EnrichOperation::RelationReclassify => {
2567 let rows = scan_generic_relations(conn, namespace, args.limit)?;
2568 Ok(rows
2569 .into_iter()
2570 .map(|(id, _, _, _)| id.to_string())
2571 .collect())
2572 }
2573 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2574 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
2575 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
2576 }
2577 EnrichOperation::EntityTypeValidate => {
2578 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
2579 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2580 }
2581 EnrichOperation::DescriptionEnrich => {
2582 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
2583 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
2584 }
2585 EnrichOperation::DomainClassify
2586 | EnrichOperation::GraphAudit
2587 | EnrichOperation::DeepResearchSynth
2588 | EnrichOperation::BodyExtract => {
2589 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
2590 let sql = format!(
2591 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
2592 );
2593 let mut stmt = conn.prepare(&sql)?;
2594 let names = stmt
2595 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
2596 .collect::<Result<Vec<_>, _>>()?;
2597 Ok(names)
2598 }
2599 }
2600}
2601
2602fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
2608 if let Some(p) = explicit {
2609 if p.exists() {
2610 return Ok(p.to_path_buf());
2611 }
2612 return Err(AppError::Validation(format!(
2613 "Codex binary not found at explicit path: {}",
2614 p.display()
2615 )));
2616 }
2617
2618 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
2619 let p = PathBuf::from(&env_path);
2620 if p.exists() {
2621 return Ok(p);
2622 }
2623 }
2624
2625 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
2626 if let Some(path_var) = std::env::var_os("PATH") {
2627 for dir in std::env::split_paths(&path_var) {
2628 let candidate = dir.join(name);
2629 if candidate.exists() {
2630 return Ok(candidate);
2631 }
2632 }
2633 }
2634
2635 Err(AppError::Validation(
2636 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
2637 ))
2638}
2639
2640fn call_weight_calibrate(
2642 conn: &Connection,
2643 _namespace: &str,
2644 item_key: &str,
2645 binary: &Path,
2646 model: Option<&str>,
2647 timeout: u64,
2648 mode: &EnrichMode,
2649) -> Result<EnrichItemResult, AppError> {
2650 let rel_id: i64 = item_key
2651 .parse()
2652 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2653 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
2654 .query_row(
2655 "SELECT e1.name, e2.name, r.relation, r.weight \
2656 FROM relationships r \
2657 JOIN entities e1 ON e1.id = r.source_id \
2658 JOIN entities e2 ON e2.id = r.target_id \
2659 WHERE r.id = ?1",
2660 rusqlite::params![rel_id],
2661 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
2662 )
2663 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2664
2665 let input_text = format!(
2666 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
2667 );
2668 let (value, cost, is_oauth) = match mode {
2669 EnrichMode::ClaudeCode => call_claude(
2670 binary,
2671 WEIGHT_CALIBRATE_PROMPT,
2672 WEIGHT_CALIBRATE_SCHEMA,
2673 &input_text,
2674 model,
2675 timeout,
2676 )?,
2677 EnrichMode::Codex => call_codex(
2678 binary,
2679 WEIGHT_CALIBRATE_PROMPT,
2680 WEIGHT_CALIBRATE_SCHEMA,
2681 &input_text,
2682 model,
2683 timeout,
2684 )?,
2685 };
2686
2687 let calibrated = value
2688 .get("calibrated_weight")
2689 .and_then(|v| v.as_f64())
2690 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
2691
2692 conn.execute(
2693 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
2694 rusqlite::params![calibrated, rel_id],
2695 )?;
2696
2697 Ok(EnrichItemResult::Done {
2698 memory_id: None,
2699 entity_id: None,
2700 entities: 0,
2701 rels: 1,
2702 chars_before: None,
2703 chars_after: None,
2704 cost,
2705 is_oauth,
2706 })
2707}
2708
2709fn call_relation_reclassify(
2711 conn: &Connection,
2712 _namespace: &str,
2713 item_key: &str,
2714 binary: &Path,
2715 model: Option<&str>,
2716 timeout: u64,
2717 mode: &EnrichMode,
2718) -> Result<EnrichItemResult, AppError> {
2719 let rel_id: i64 = item_key
2720 .parse()
2721 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
2722 let (source_name, target_name, current_relation): (String, String, String) = conn
2723 .query_row(
2724 "SELECT e1.name, e2.name, r.relation \
2725 FROM relationships r \
2726 JOIN entities e1 ON e1.id = r.source_id \
2727 JOIN entities e2 ON e2.id = r.target_id \
2728 WHERE r.id = ?1",
2729 rusqlite::params![rel_id],
2730 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2731 )
2732 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
2733
2734 let input_text = format!(
2735 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
2736 );
2737 let (value, cost, is_oauth) = match mode {
2738 EnrichMode::ClaudeCode => call_claude(
2739 binary,
2740 RELATION_RECLASSIFY_PROMPT,
2741 RELATION_RECLASSIFY_SCHEMA,
2742 &input_text,
2743 model,
2744 timeout,
2745 )?,
2746 EnrichMode::Codex => call_codex(
2747 binary,
2748 RELATION_RECLASSIFY_PROMPT,
2749 RELATION_RECLASSIFY_SCHEMA,
2750 &input_text,
2751 model,
2752 timeout,
2753 )?,
2754 };
2755
2756 let new_relation = value
2757 .get("relation")
2758 .and_then(|v| v.as_str())
2759 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
2760 let new_strength = value
2761 .get("strength")
2762 .and_then(|v| v.as_f64())
2763 .unwrap_or(0.5);
2764
2765 conn.execute(
2766 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
2767 rusqlite::params![new_relation, new_strength, rel_id],
2768 )?;
2769
2770 Ok(EnrichItemResult::Done {
2771 memory_id: None,
2772 entity_id: None,
2773 entities: 0,
2774 rels: 1,
2775 chars_before: None,
2776 chars_after: None,
2777 cost,
2778 is_oauth,
2779 })
2780}
2781
2782fn call_entity_connect(
2784 conn: &Connection,
2785 namespace: &str,
2786 item_key: &str,
2787 binary: &Path,
2788 model: Option<&str>,
2789 timeout: u64,
2790 mode: &EnrichMode,
2791) -> Result<EnrichItemResult, AppError> {
2792 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
2793 let (e1_id, e1_name, e2_id, e2_name) =
2794 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
2795 Some(p) => p,
2796 None => {
2797 return Ok(EnrichItemResult::Skipped {
2798 reason: "pair no longer isolated".into(),
2799 })
2800 }
2801 };
2802 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
2803 let (value, cost, is_oauth) = match mode {
2804 EnrichMode::ClaudeCode => call_claude(
2805 binary,
2806 ENTITY_CONNECT_PROMPT,
2807 ENTITY_CONNECT_SCHEMA,
2808 &input_text,
2809 model,
2810 timeout,
2811 )?,
2812 EnrichMode::Codex => call_codex(
2813 binary,
2814 ENTITY_CONNECT_PROMPT,
2815 ENTITY_CONNECT_SCHEMA,
2816 &input_text,
2817 model,
2818 timeout,
2819 )?,
2820 };
2821 let relation = value
2822 .get("relation")
2823 .and_then(|v| v.as_str())
2824 .unwrap_or("none");
2825 if relation == "none" {
2826 return Ok(EnrichItemResult::Skipped {
2827 reason: "LLM determined no relationship".into(),
2828 });
2829 }
2830 let strength = value
2831 .get("strength")
2832 .and_then(|v| v.as_f64())
2833 .unwrap_or(0.5);
2834 conn.execute(
2835 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
2836 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
2837 )?;
2838 Ok(EnrichItemResult::Done {
2839 memory_id: None,
2840 entity_id: None,
2841 entities: 0,
2842 rels: 1,
2843 chars_before: None,
2844 chars_after: None,
2845 cost,
2846 is_oauth,
2847 })
2848}
2849
2850fn call_entity_type_validate(
2852 conn: &Connection,
2853 _namespace: &str,
2854 item_key: &str,
2855 binary: &Path,
2856 model: Option<&str>,
2857 timeout: u64,
2858 mode: &EnrichMode,
2859) -> Result<EnrichItemResult, AppError> {
2860 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
2861 .query_row(
2862 "SELECT id, name, type FROM entities WHERE name = ?1",
2863 rusqlite::params![item_key],
2864 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2865 )
2866 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
2867 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
2868 let (value, cost, is_oauth) = match mode {
2869 EnrichMode::ClaudeCode => call_claude(
2870 binary,
2871 ENTITY_TYPE_VALIDATE_PROMPT,
2872 ENTITY_TYPE_VALIDATE_SCHEMA,
2873 &input_text,
2874 model,
2875 timeout,
2876 )?,
2877 EnrichMode::Codex => call_codex(
2878 binary,
2879 ENTITY_TYPE_VALIDATE_PROMPT,
2880 ENTITY_TYPE_VALIDATE_SCHEMA,
2881 &input_text,
2882 model,
2883 timeout,
2884 )?,
2885 };
2886 let validated_type = value
2887 .get("validated_type")
2888 .and_then(|v| v.as_str())
2889 .unwrap_or(&ent_type);
2890 let was_correct = value
2891 .get("was_correct")
2892 .and_then(|v| v.as_bool())
2893 .unwrap_or(true);
2894 if !was_correct {
2895 conn.execute(
2896 "UPDATE entities SET type = ?1 WHERE id = ?2",
2897 rusqlite::params![validated_type, ent_id],
2898 )?;
2899 }
2900 Ok(EnrichItemResult::Done {
2901 memory_id: None,
2902 entity_id: Some(ent_id),
2903 entities: 1,
2904 rels: 0,
2905 chars_before: None,
2906 chars_after: None,
2907 cost,
2908 is_oauth,
2909 })
2910}
2911
2912fn call_description_enrich(
2914 conn: &Connection,
2915 _namespace: &str,
2916 item_key: &str,
2917 binary: &Path,
2918 model: Option<&str>,
2919 timeout: u64,
2920 mode: &EnrichMode,
2921) -> Result<EnrichItemResult, AppError> {
2922 let (mem_id, body, old_desc): (i64, String, String) = conn
2923 .query_row(
2924 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2925 rusqlite::params![item_key],
2926 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
2927 )
2928 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2929 let snippet: String = body.chars().take(500).collect();
2930 let input_text = format!(
2931 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
2932 );
2933 let (value, cost, is_oauth) = match mode {
2934 EnrichMode::ClaudeCode => call_claude(
2935 binary,
2936 DESCRIPTION_ENRICH_PROMPT,
2937 DESCRIPTION_ENRICH_SCHEMA,
2938 &input_text,
2939 model,
2940 timeout,
2941 )?,
2942 EnrichMode::Codex => call_codex(
2943 binary,
2944 DESCRIPTION_ENRICH_PROMPT,
2945 DESCRIPTION_ENRICH_SCHEMA,
2946 &input_text,
2947 model,
2948 timeout,
2949 )?,
2950 };
2951 let new_desc = value
2952 .get("description")
2953 .and_then(|v| v.as_str())
2954 .unwrap_or(&old_desc);
2955 conn.execute(
2956 "UPDATE memories SET description = ?1 WHERE id = ?2",
2957 rusqlite::params![new_desc, mem_id],
2958 )?;
2959 Ok(EnrichItemResult::Done {
2960 memory_id: Some(mem_id),
2961 entity_id: None,
2962 entities: 0,
2963 rels: 0,
2964 chars_before: Some(old_desc.len()),
2965 chars_after: Some(new_desc.len()),
2966 cost,
2967 is_oauth,
2968 })
2969}
2970
2971fn call_domain_classify(
2973 conn: &Connection,
2974 _namespace: &str,
2975 item_key: &str,
2976 binary: &Path,
2977 model: Option<&str>,
2978 timeout: u64,
2979 mode: &EnrichMode,
2980) -> Result<EnrichItemResult, AppError> {
2981 let (mem_id, body, desc): (i64, String, String) = conn
2982 .query_row(
2983 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
2984 rusqlite::params![item_key],
2985 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
2986 )
2987 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
2988 let snippet: String = body.chars().take(500).collect();
2989 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
2990 let (value, cost, is_oauth) = match mode {
2991 EnrichMode::ClaudeCode => call_claude(
2992 binary,
2993 DOMAIN_CLASSIFY_PROMPT,
2994 DOMAIN_CLASSIFY_SCHEMA,
2995 &input_text,
2996 model,
2997 timeout,
2998 )?,
2999 EnrichMode::Codex => call_codex(
3000 binary,
3001 DOMAIN_CLASSIFY_PROMPT,
3002 DOMAIN_CLASSIFY_SCHEMA,
3003 &input_text,
3004 model,
3005 timeout,
3006 )?,
3007 };
3008 let domain = value
3009 .get("domain")
3010 .and_then(|v| v.as_str())
3011 .unwrap_or("uncategorized");
3012 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
3013 conn.execute(
3014 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
3015 rusqlite::params![metadata, mem_id],
3016 )?;
3017 Ok(EnrichItemResult::Done {
3018 memory_id: Some(mem_id),
3019 entity_id: None,
3020 entities: 0,
3021 rels: 0,
3022 chars_before: None,
3023 chars_after: None,
3024 cost,
3025 is_oauth,
3026 })
3027}
3028
3029fn call_graph_audit(
3031 conn: &Connection,
3032 _namespace: &str,
3033 item_key: &str,
3034 binary: &Path,
3035 model: Option<&str>,
3036 timeout: u64,
3037 mode: &EnrichMode,
3038) -> Result<EnrichItemResult, AppError> {
3039 let (mem_id, body, desc): (i64, String, String) = conn
3040 .query_row(
3041 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3042 rusqlite::params![item_key],
3043 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3044 )
3045 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3046 let snippet: String = body.chars().take(500).collect();
3047 let ent_count: i64 = conn
3048 .query_row(
3049 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
3050 rusqlite::params![mem_id],
3051 |r| r.get(0),
3052 )
3053 .unwrap_or(0);
3054 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
3055 let (value, cost, is_oauth) = match mode {
3056 EnrichMode::ClaudeCode => call_claude(
3057 binary,
3058 GRAPH_AUDIT_PROMPT,
3059 GRAPH_AUDIT_SCHEMA,
3060 &input_text,
3061 model,
3062 timeout,
3063 )?,
3064 EnrichMode::Codex => call_codex(
3065 binary,
3066 GRAPH_AUDIT_PROMPT,
3067 GRAPH_AUDIT_SCHEMA,
3068 &input_text,
3069 model,
3070 timeout,
3071 )?,
3072 };
3073 let issues = value
3074 .get("issues")
3075 .and_then(|v| v.as_array())
3076 .map(|a| a.len())
3077 .unwrap_or(0);
3078 Ok(EnrichItemResult::Done {
3079 memory_id: Some(mem_id),
3080 entity_id: None,
3081 entities: 0,
3082 rels: issues,
3083 chars_before: None,
3084 chars_after: None,
3085 cost,
3086 is_oauth,
3087 })
3088}
3089
3090fn call_deep_research_synth(
3092 conn: &Connection,
3093 namespace: &str,
3094 item_key: &str,
3095 binary: &Path,
3096 model: Option<&str>,
3097 timeout: u64,
3098 mode: &EnrichMode,
3099) -> Result<EnrichItemResult, AppError> {
3100 let (mem_id, body): (i64, String) = conn
3101 .query_row(
3102 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3103 rusqlite::params![item_key],
3104 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3105 )
3106 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3107 let snippet: String = body.chars().take(2000).collect();
3108 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
3109 let (value, cost, is_oauth) = match mode {
3110 EnrichMode::ClaudeCode => call_claude(
3111 binary,
3112 DEEP_RESEARCH_SYNTH_PROMPT,
3113 DEEP_RESEARCH_SYNTH_SCHEMA,
3114 &input_text,
3115 model,
3116 timeout,
3117 )?,
3118 EnrichMode::Codex => call_codex(
3119 binary,
3120 DEEP_RESEARCH_SYNTH_PROMPT,
3121 DEEP_RESEARCH_SYNTH_SCHEMA,
3122 &input_text,
3123 model,
3124 timeout,
3125 )?,
3126 };
3127 let mut ent_count = 0usize;
3128 let mut rel_count = 0usize;
3129 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
3130 for e in ents {
3131 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
3132 let etype_str = e
3133 .get("entity_type")
3134 .and_then(|v| v.as_str())
3135 .unwrap_or("concept");
3136 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
3137 if name.len() >= 2 {
3138 let ne = NewEntity {
3139 name: name.to_string(),
3140 entity_type: etype,
3141 description: None,
3142 };
3143 let _ = entities::upsert_entity(conn, namespace, &ne);
3144 ent_count += 1;
3145 }
3146 }
3147 }
3148 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
3149 for r in rels {
3150 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
3151 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
3152 if src.is_empty() || tgt.is_empty() {
3153 continue;
3154 }
3155 let rel = r
3156 .get("relation")
3157 .and_then(|v| v.as_str())
3158 .unwrap_or("related");
3159 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
3160 if let (Some(sid), Some(tid)) = (
3161 entities::find_entity_id(conn, namespace, src)?,
3162 entities::find_entity_id(conn, namespace, tgt)?,
3163 ) {
3164 let _ = entities::create_or_fetch_relationship(
3165 conn, namespace, sid, tid, rel, str_, None,
3166 );
3167 rel_count += 1;
3168 }
3169 }
3170 }
3171 Ok(EnrichItemResult::Done {
3172 memory_id: Some(mem_id),
3173 entity_id: None,
3174 entities: ent_count,
3175 rels: rel_count,
3176 chars_before: None,
3177 chars_after: None,
3178 cost,
3179 is_oauth,
3180 })
3181}
3182
3183fn call_body_extract(
3185 conn: &Connection,
3186 _namespace: &str,
3187 item_key: &str,
3188 binary: &Path,
3189 model: Option<&str>,
3190 timeout: u64,
3191 mode: &EnrichMode,
3192) -> Result<EnrichItemResult, AppError> {
3193 let (mem_id, body): (i64, String) = conn
3194 .query_row(
3195 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3196 rusqlite::params![item_key],
3197 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
3198 )
3199 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3200 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
3201 let (value, cost, is_oauth) = match mode {
3202 EnrichMode::ClaudeCode => call_claude(
3203 binary,
3204 BODY_EXTRACT_PROMPT,
3205 BODY_EXTRACT_SCHEMA,
3206 &input_text,
3207 model,
3208 timeout,
3209 )?,
3210 EnrichMode::Codex => call_codex(
3211 binary,
3212 BODY_EXTRACT_PROMPT,
3213 BODY_EXTRACT_SCHEMA,
3214 &input_text,
3215 model,
3216 timeout,
3217 )?,
3218 };
3219 let restructured = value
3220 .get("restructured_body")
3221 .and_then(|v| v.as_str())
3222 .unwrap_or(&body);
3223 let chars_before = body.len();
3224 let chars_after = restructured.len();
3225 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
3226 conn.execute(
3227 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
3228 rusqlite::params![restructured, new_hash, mem_id],
3229 )?;
3230 Ok(EnrichItemResult::Done {
3231 memory_id: Some(mem_id),
3232 entity_id: None,
3233 entities: 0,
3234 rels: 0,
3235 chars_before: Some(chars_before),
3236 chars_after: Some(chars_after),
3237 cost,
3238 is_oauth,
3239 })
3240}
3241
3242#[allow(clippy::type_complexity)]
3244fn scan_isolated_entity_pairs(
3245 conn: &Connection,
3246 namespace: &str,
3247 limit: Option<usize>,
3248) -> Result<Vec<(i64, String, i64, String)>, AppError> {
3249 let limit_val = limit.unwrap_or(50) as i64;
3250 let mut stmt = conn.prepare_cached(
3251 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
3252 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
3253 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
3254 (r.source_id = e1.id AND r.target_id = e2.id) OR \
3255 (r.source_id = e2.id AND r.target_id = e1.id)) \
3256 LIMIT ?2",
3257 )?;
3258 let rows = stmt
3259 .query_map(rusqlite::params![namespace, limit_val], |r| {
3260 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
3261 })?
3262 .collect::<Result<Vec<_>, _>>()?;
3263 Ok(rows)
3264}
3265
3266fn scan_entities_for_type_validation(
3268 conn: &Connection,
3269 namespace: &str,
3270 limit: Option<usize>,
3271) -> Result<Vec<(i64, String, String)>, AppError> {
3272 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3273 let sql = format!(
3274 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
3275 );
3276 let mut stmt = conn.prepare(&sql)?;
3277 let rows = stmt
3278 .query_map(rusqlite::params![namespace], |r| {
3279 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3280 })?
3281 .collect::<Result<Vec<_>, _>>()?;
3282 Ok(rows)
3283}
3284
3285fn scan_generic_descriptions(
3287 conn: &Connection,
3288 namespace: &str,
3289 limit: Option<usize>,
3290) -> Result<Vec<(i64, String, String)>, AppError> {
3291 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3292 let sql = format!(
3293 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
3294 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
3295 ORDER BY id {limit_clause}"
3296 );
3297 let mut stmt = conn.prepare(&sql)?;
3298 let rows = stmt
3299 .query_map(rusqlite::params![namespace], |r| {
3300 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
3301 })?
3302 .collect::<Result<Vec<_>, _>>()?;
3303 Ok(rows)
3304}
3305
3306fn call_codex(
3310 binary: &Path,
3311 prompt: &str,
3312 json_schema: &str,
3313 input_text: &str,
3314 model: Option<&str>,
3315 timeout_secs: u64,
3316) -> Result<(serde_json::Value, f64, bool), AppError> {
3317 use wait_timeout::ChildExt;
3318
3319 super::codex_spawn::validate_codex_model(model)?;
3324 let schema_file = super::codex_spawn::trusted_schema_path()?;
3325
3326 let args = super::codex_spawn::CodexSpawnArgs {
3327 binary,
3328 prompt,
3329 json_schema,
3330 input_text,
3331 model,
3332 timeout_secs,
3333 schema_path: schema_file.clone(),
3334 };
3335 let mut cmd = super::codex_spawn::build_codex_command(&args);
3336
3337 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
3338 AppError::Io(std::io::Error::new(
3339 e.kind(),
3340 format!("failed to spawn codex: {e}"),
3341 ))
3342 })?;
3343
3344 let full_prompt = format!("{prompt}\n\n{input_text}");
3345 let stdin_bytes = full_prompt.into_bytes();
3346 let mut child_stdin = child
3347 .stdin
3348 .take()
3349 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
3350 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
3351 child_stdin.write_all(&stdin_bytes)?;
3352 drop(child_stdin);
3353 Ok(())
3354 });
3355
3356 let start = std::time::Instant::now();
3357 let timeout = std::time::Duration::from_secs(timeout_secs);
3358 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
3359 let _ = std::fs::remove_file(&schema_file);
3360
3361 match status {
3362 Some(exit_status) => {
3363 stdin_thread
3364 .join()
3365 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
3366 .map_err(AppError::Io)?;
3367
3368 tracing::debug!(
3369 target: "process",
3370 exit_code = ?exit_status.code(),
3371 elapsed_ms = start.elapsed().as_millis() as u64,
3372 "external process completed"
3373 );
3374
3375 let mut stdout_buf = Vec::new();
3376 if let Some(mut out) = child.stdout.take() {
3377 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
3378 }
3379 if !exit_status.success() {
3380 let mut stderr_buf = Vec::new();
3381 if let Some(mut err) = child.stderr.take() {
3382 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
3383 }
3384 let stderr_str = String::from_utf8_lossy(&stderr_buf);
3385 tracing::warn!(
3386 target: "enrich",
3387 exit_code = ?exit_status.code(),
3388 stderr = %stderr_str.trim(),
3389 "codex process failed"
3390 );
3391 return Err(AppError::Validation(format!(
3392 "codex exited with code {:?}: {}",
3393 exit_status.code(),
3394 stderr_str.trim()
3395 )));
3396 }
3397 let stdout_str = String::from_utf8(stdout_buf)
3398 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
3399 let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
3402 let urls: Vec<serde_json::Value> = result
3407 .extraction
3408 .urls
3409 .iter()
3410 .map(|u| serde_json::json!({"url": u.url, "offset": u.offset}))
3411 .collect();
3412 let value = serde_json::json!({
3413 "entities": result.extraction.entities,
3414 "relationships": result.extraction.relationships,
3415 "urls": urls,
3416 "extraction_method": result.extraction.extraction_method,
3417 });
3418 Ok((value, 0.0, false))
3419 }
3420 None => {
3421 let _ = child.kill();
3422 let _ = child.wait();
3423 let _ = stdin_thread.join();
3424 Err(AppError::Validation(format!(
3425 "codex timed out after {timeout_secs} seconds"
3426 )))
3427 }
3428 }
3429}
3430
3431#[cfg(test)]
3436mod tests {
3437 use super::*;
3438 use rusqlite::Connection;
3439
3440 fn open_test_db() -> Connection {
3442 let conn = Connection::open_in_memory().expect("in-memory db");
3443 conn.execute_batch(
3444 "CREATE TABLE memories (
3445 id INTEGER PRIMARY KEY AUTOINCREMENT,
3446 namespace TEXT NOT NULL DEFAULT 'global',
3447 name TEXT NOT NULL,
3448 type TEXT NOT NULL DEFAULT 'note',
3449 description TEXT NOT NULL DEFAULT '',
3450 body TEXT NOT NULL DEFAULT '',
3451 body_hash TEXT NOT NULL DEFAULT '',
3452 session_id TEXT,
3453 source TEXT NOT NULL DEFAULT 'agent',
3454 metadata TEXT NOT NULL DEFAULT '{}',
3455 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3456 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3457 deleted_at INTEGER,
3458 UNIQUE(namespace, name)
3459 );
3460 CREATE TABLE entities (
3461 id INTEGER PRIMARY KEY AUTOINCREMENT,
3462 namespace TEXT NOT NULL DEFAULT 'global',
3463 name TEXT NOT NULL,
3464 type TEXT NOT NULL DEFAULT 'concept',
3465 description TEXT,
3466 degree INTEGER NOT NULL DEFAULT 0,
3467 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
3468 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
3469 UNIQUE(namespace, name)
3470 );
3471 CREATE TABLE memory_entities (
3472 memory_id INTEGER NOT NULL,
3473 entity_id INTEGER NOT NULL,
3474 PRIMARY KEY (memory_id, entity_id)
3475 );
3476 CREATE TABLE relationships (
3477 id INTEGER PRIMARY KEY AUTOINCREMENT,
3478 namespace TEXT NOT NULL DEFAULT 'global',
3479 source_id INTEGER NOT NULL,
3480 target_id INTEGER NOT NULL,
3481 relation TEXT NOT NULL,
3482 weight REAL NOT NULL DEFAULT 0.5,
3483 description TEXT,
3484 UNIQUE(source_id, target_id, relation)
3485 );",
3486 )
3487 .expect("schema creation must succeed");
3488 conn
3489 }
3490
3491 #[test]
3492 fn scan_unbound_memories_finds_memories_without_bindings() {
3493 let conn = open_test_db();
3494 conn.execute(
3495 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
3496 [],
3497 )
3498 .unwrap();
3499
3500 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3501 assert_eq!(results.len(), 1);
3502 assert_eq!(results[0].1, "test-mem");
3503 }
3504
3505 #[test]
3506 fn scan_unbound_memories_excludes_bound_memories() {
3507 let conn = open_test_db();
3508 conn.execute(
3509 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
3510 [],
3511 )
3512 .unwrap();
3513 let mem_id: i64 = conn
3514 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
3515 r.get(0)
3516 })
3517 .unwrap();
3518 conn.execute(
3519 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
3520 [],
3521 )
3522 .unwrap();
3523 let ent_id: i64 = conn
3524 .query_row(
3525 "SELECT id FROM entities WHERE name='some-entity'",
3526 [],
3527 |r| r.get(0),
3528 )
3529 .unwrap();
3530 conn.execute(
3531 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
3532 rusqlite::params![mem_id, ent_id],
3533 )
3534 .unwrap();
3535
3536 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
3537 assert!(results.is_empty(), "bound memory must not appear in scan");
3538 }
3539
3540 #[test]
3541 fn scan_entities_without_description_finds_null_description() {
3542 let conn = open_test_db();
3543 conn.execute(
3544 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
3545 [],
3546 )
3547 .unwrap();
3548
3549 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3550 assert_eq!(results.len(), 1);
3551 assert_eq!(results[0].1, "my-tool");
3552 }
3553
3554 #[test]
3555 fn scan_entities_without_description_excludes_entities_with_description() {
3556 let conn = open_test_db();
3557 conn.execute(
3558 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
3559 [],
3560 )
3561 .unwrap();
3562
3563 let results = scan_entities_without_description(&conn, "global", None).unwrap();
3564 assert!(
3565 results.is_empty(),
3566 "entity with description must not appear"
3567 );
3568 }
3569
3570 #[test]
3571 fn scan_short_body_memories_finds_short_bodies() {
3572 let conn = open_test_db();
3573 conn.execute(
3574 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
3575 [],
3576 )
3577 .unwrap();
3578
3579 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3580 assert_eq!(results.len(), 1);
3581 assert_eq!(results[0].1, "short-mem");
3582 }
3583
3584 #[test]
3585 fn scan_short_body_memories_excludes_long_bodies() {
3586 let conn = open_test_db();
3587 let long_body = "a".repeat(1000);
3588 conn.execute(
3589 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
3590 rusqlite::params![long_body],
3591 )
3592 .unwrap();
3593
3594 let results = scan_short_body_memories(&conn, "global", 100, None).unwrap();
3595 assert!(results.is_empty(), "long memory must not appear in scan");
3596 }
3597
3598 #[test]
3599 fn scan_respects_limit() {
3600 let conn = open_test_db();
3601 for i in 0..5 {
3602 conn.execute(
3603 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
3604 [],
3605 )
3606 .unwrap();
3607 }
3608
3609 let results = scan_short_body_memories(&conn, "global", 1000, Some(3)).unwrap();
3610 assert_eq!(results.len(), 3, "limit must be respected");
3611 }
3612
3613 #[test]
3614 fn queue_db_schema_creates_correctly() {
3615 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
3616 let conn = open_queue_db(&tmp_path).expect("queue db must open");
3617 let count: i64 = conn
3618 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
3619 .unwrap();
3620 assert_eq!(count, 0);
3621 let _ = std::fs::remove_file(&tmp_path);
3622 }
3623
3624 #[test]
3625 fn parse_claude_output_valid_bindings() {
3626 let output = r#"[
3627 {"type":"system","subtype":"init"},
3628 {"type":"result","is_error":false,"total_cost_usd":0.01,
3629 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
3630 ]"#;
3631 let result = crate::commands::claude_runner::parse_claude_output(output)
3632 .expect("must parse successfully");
3633 assert!(result.value.get("entities").is_some());
3634 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
3635 assert!(!result.is_oauth);
3636 }
3637
3638 #[test]
3639 fn parse_claude_output_detects_oauth() {
3640 let output = r#"[
3641 {"type":"system","subtype":"init","apiKeySource":"none"},
3642 {"type":"result","is_error":false,"total_cost_usd":0.0,
3643 "structured_output":{"entities":[],"relationships":[]}}
3644 ]"#;
3645 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
3646 assert!(result.is_oauth);
3647 }
3648
3649 #[test]
3650 fn parse_claude_output_rate_limit_returns_error() {
3651 let output = r#"[
3652 {"type":"system","subtype":"init"},
3653 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
3654 ]"#;
3655 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3656 assert!(matches!(err, AppError::RateLimited { .. }));
3657 }
3658
3659 #[test]
3660 fn parse_claude_output_auth_error() {
3661 let output = r#"[
3662 {"type":"system","subtype":"init"},
3663 {"type":"result","is_error":true,"error":"authentication failed"}
3664 ]"#;
3665 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
3666 assert!(format!("{err}").contains("authentication failed"));
3667 }
3668
3669 #[test]
3670 fn dry_run_emits_preview_without_calling_llm() {
3671 let conn = open_test_db();
3676 conn.execute(
3677 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
3678 [],
3679 )
3680 .unwrap();
3681
3682 let results = scan_short_body_memories(&conn, "global", 1000, None).unwrap();
3683 assert_eq!(results.len(), 1);
3684 assert_eq!(results[0].1, "dry-mem");
3685 }
3688
3689 #[test]
3690 fn persist_entity_description_updates_db() {
3691 let conn = open_test_db();
3692 conn.execute(
3693 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
3694 [],
3695 )
3696 .unwrap();
3697 let eid: i64 = conn
3698 .query_row(
3699 "SELECT id FROM entities WHERE name='tokio-runtime'",
3700 [],
3701 |r| r.get(0),
3702 )
3703 .unwrap();
3704
3705 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
3706
3707 let desc: String = conn
3708 .query_row(
3709 "SELECT description FROM entities WHERE id=?1",
3710 rusqlite::params![eid],
3711 |r| r.get(0),
3712 )
3713 .unwrap();
3714 assert_eq!(desc, "Async runtime for Rust applications");
3715 }
3716
3717 #[test]
3718 fn bindings_schema_is_valid_json() {
3719 let _: serde_json::Value =
3720 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
3721 }
3722
3723 #[test]
3724 fn entity_description_schema_is_valid_json() {
3725 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
3726 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
3727 }
3728
3729 #[test]
3730 fn body_enrich_schema_is_valid_json() {
3731 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
3732 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
3733 }
3734}