1use crate::commands::ingest_claude::find_claude_binary;
28use crate::constants::MAX_MEMORY_BODY_LEN;
29use crate::entity_type::EntityType;
30use crate::errors::AppError;
31use crate::paths::AppPaths;
32use crate::storage::connection::{ensure_db_ready, open_rw};
33use crate::storage::entities::{self, NewEntity, NewRelationship};
34use crate::storage::memories;
35
36use rusqlite::Connection;
37use serde::{Deserialize, Serialize};
38use std::io::Write;
39use std::path::{Path, PathBuf};
40use std::time::Instant;
41
42const DEFAULT_QUEUE_DB: &str = ".enrich-queue.sqlite";
47const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
48const DEFAULT_BODY_ENRICH_MIN_CHARS: usize = 500;
49const DEFAULT_BODY_ENRICH_MAX_CHARS: usize = 2000;
50
51const BINDINGS_SCHEMA: &str = r#"{
56 "type": "object",
57 "properties": {
58 "entities": {
59 "type": "array",
60 "items": {
61 "type": "object",
62 "properties": {
63 "name": { "type": "string" },
64 "entity_type": {
65 "type": "string",
66 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
67 }
68 },
69 "required": ["name", "entity_type"],
70 "additionalProperties": false
71 }
72 },
73 "relationships": {
74 "type": "array",
75 "items": {
76 "type": "object",
77 "properties": {
78 "source": { "type": "string" },
79 "target": { "type": "string" },
80 "relation": {
81 "type": "string",
82 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
83 },
84 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
85 },
86 "required": ["source","target","relation","strength"],
87 "additionalProperties": false
88 }
89 }
90 },
91 "required": ["entities","relationships"],
92 "additionalProperties": false
93}"#;
94
95const ENTITY_DESCRIPTION_SCHEMA: &str = r#"{
96 "type": "object",
97 "properties": {
98 "description": { "type": "string" }
99 },
100 "required": ["description"],
101 "additionalProperties": false
102}"#;
103
104const BODY_ENRICH_SCHEMA: &str = r#"{
105 "type": "object",
106 "properties": {
107 "enriched_body": { "type": "string" }
108 },
109 "required": ["enriched_body"],
110 "additionalProperties": false
111}"#;
112
113const WEIGHT_CALIBRATE_PROMPT: &str = "You are a knowledge graph quality auditor. Evaluate whether this relationship weight is correctly calibrated.\n\n\
115Scale:\n\
116- 0.9 = vital hard dependency (A cannot function without B)\n\
117- 0.7 = important design relationship (A strongly supports/enables B)\n\
118- 0.5 = useful contextual link (A and B share relevant context)\n\
119- 0.3 = weak reference (A mentions B without strong coupling)\n\n\
120Respond with the calibrated weight and brief reasoning.";
121
122const WEIGHT_CALIBRATE_SCHEMA: &str = r#"{
123 "type": "object",
124 "properties": {
125 "calibrated_weight": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
126 "reasoning": { "type": "string" }
127 },
128 "required": ["calibrated_weight", "reasoning"],
129 "additionalProperties": false
130}"#;
131
132const RELATION_RECLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. The relationship between these entities uses a generic type. Determine the REAL semantic relationship.\n\n\
134Valid canonical relations (pick exactly one):\n\
135- depends-on: A cannot function without B\n\
136- uses: A utilizes B but could substitute it\n\
137- supports: A reinforces or enables B\n\
138- causes: A triggers or produces B\n\
139- fixes: A resolves a problem in B\n\
140- contradicts: A conflicts with or invalidates B\n\
141- applies-to: A is relevant to or scoped within B\n\
142- follows: A comes after B in sequence\n\
143- replaces: A substitutes B\n\
144- tracked-in: A is monitored in B\n\
145- related: A and B share context (use sparingly)\n\n\
146Respond with the correct relation, strength, and reasoning.";
147
148const RELATION_RECLASSIFY_SCHEMA: &str = r#"{
149 "type": "object",
150 "properties": {
151 "relation": { "type": "string" },
152 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
153 "reasoning": { "type": "string" }
154 },
155 "required": ["relation", "strength", "reasoning"],
156 "additionalProperties": false
157}"#;
158
159const ENTITY_CONNECT_PROMPT: &str = "You are a knowledge graph quality auditor. Two entities exist in the same graph but have no relationship between them. Determine if a meaningful relationship exists.\n\n\
161Valid canonical relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, replaces, tracked-in, related.\n\n\
162If NO meaningful relationship exists, set relation to \"none\".\n\
163Respond with the relation (or \"none\"), strength, and reasoning.";
164
165const ENTITY_CONNECT_SCHEMA: &str = r#"{
166 "type": "object",
167 "properties": {
168 "relation": { "type": "string" },
169 "strength": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
170 "reasoning": { "type": "string" }
171 },
172 "required": ["relation", "strength", "reasoning"],
173 "additionalProperties": false
174}"#;
175
176const ENTITY_TYPE_VALIDATE_PROMPT: &str = "You are a knowledge graph quality auditor. Verify whether this entity's type is correct.\n\n\
178Valid entity types: project, tool, person, file, concept, incident, decision, organization, location, date.\n\n\
179If the current type is correct, keep it. If wrong, suggest the correct type.\n\
180Respond with the validated type and reasoning.";
181
182const ENTITY_TYPE_VALIDATE_SCHEMA: &str = r#"{
183 "type": "object",
184 "properties": {
185 "validated_type": { "type": "string" },
186 "was_correct": { "type": "boolean" },
187 "reasoning": { "type": "string" }
188 },
189 "required": ["validated_type", "was_correct", "reasoning"],
190 "additionalProperties": false
191}"#;
192
193const DESCRIPTION_ENRICH_PROMPT: &str = "You are a knowledge graph quality auditor. This memory has a generic or auto-generated description. Write a concise, semantic description (10-20 words) that captures WHAT this memory is about and WHY it matters.\n\n\
195BAD: 'ingested from docs/auth.md'\n\
196GOOD: 'JWT token rotation strategy with 15-min expiry and refresh flow'\n\n\
197Respond with the improved description and reasoning.";
198
199const DESCRIPTION_ENRICH_SCHEMA: &str = r#"{
200 "type": "object",
201 "properties": {
202 "description": { "type": "string" },
203 "reasoning": { "type": "string" }
204 },
205 "required": ["description", "reasoning"],
206 "additionalProperties": false
207}"#;
208
209const DOMAIN_CLASSIFY_PROMPT: &str = "You are a knowledge graph quality auditor. Classify this memory into its primary domain category.\n\n\
211Respond with the domain name (kebab-case, 2-4 words) and reasoning.";
212
213const DOMAIN_CLASSIFY_SCHEMA: &str = r#"{
214 "type": "object",
215 "properties": {
216 "domain": { "type": "string" },
217 "confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
218 "reasoning": { "type": "string" }
219 },
220 "required": ["domain", "confidence", "reasoning"],
221 "additionalProperties": false
222}"#;
223
224const GRAPH_AUDIT_PROMPT: &str = "You are a knowledge graph quality auditor. Analyze this memory and its entity bindings for quality issues.\n\n\
226Check for: missing entities, wrong entity types, redundant relationships, orphaned entities, generic descriptions, low-signal relationships.\n\n\
227Respond with a list of issues found (or empty if none) and an overall quality score.";
228
229const GRAPH_AUDIT_SCHEMA: &str = r#"{
230 "type": "object",
231 "properties": {
232 "quality_score": { "type": "number", "minimum": 0.0, "maximum": 1.0 },
233 "issues": { "type": "array", "items": { "type": "object", "properties": { "kind": { "type": "string" }, "detail": { "type": "string" } }, "required": ["kind", "detail"] } },
234 "reasoning": { "type": "string" }
235 },
236 "required": ["quality_score", "issues", "reasoning"],
237 "additionalProperties": false
238}"#;
239
240const DEEP_RESEARCH_SYNTH_PROMPT: &str = "You are a knowledge graph synthesizer. Given this memory body, extract key findings and synthesize them into structured entities and relationships.\n\n\
242Entity names: lowercase kebab-case, domain-specific.\n\
243Relations: depends-on, uses, supports, causes, fixes, contradicts, applies-to, follows, related, replaces, tracked-in.\n\n\
244Respond with extracted entities, relationships, and a synthesis summary.";
245
246const DEEP_RESEARCH_SYNTH_SCHEMA: &str = r#"{
247 "type": "object",
248 "properties": {
249 "entities": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "entity_type": { "type": "string" } }, "required": ["name", "entity_type"] } },
250 "relationships": { "type": "array", "items": { "type": "object", "properties": { "source": { "type": "string" }, "target": { "type": "string" }, "relation": { "type": "string" }, "strength": { "type": "number" } }, "required": ["source", "target", "relation", "strength"] } },
251 "summary": { "type": "string" }
252 },
253 "required": ["entities", "relationships", "summary"],
254 "additionalProperties": false
255}"#;
256
257const BODY_EXTRACT_PROMPT: &str = "You are a structured data extractor. Given this memory body (which may be unstructured text, raw notes, or a transcript), extract and restructure the content into a clean, well-organized markdown body.\n\n\
259Preserve all factual content. Remove noise, fix formatting, add section headers where appropriate.\n\
260Respond with the restructured body and a brief summary of changes.";
261
262const BODY_EXTRACT_SCHEMA: &str = r#"{
263 "type": "object",
264 "properties": {
265 "restructured_body": { "type": "string" },
266 "changes_summary": { "type": "string" }
267 },
268 "required": ["restructured_body", "changes_summary"],
269 "additionalProperties": false
270}"#;
271
272const BINDINGS_PROMPT: &str = "You are a knowledge graph entity extractor. Given a memory body, extract:\n\
2771. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
2782. Typed relationships between entities with strength scores\n\n\
279Rules:\n\
280- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
281- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
282- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
283- NEVER use 'mentions' as relationship type\n\
284- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
285- Prefer fewer high-quality entities over many low-quality ones";
286
287const ENTITY_DESCRIPTION_PROMPT_PREFIX: &str = "You are a knowledge graph annotator. Given an entity name and type, write a concise one-sentence description (10-20 words) that explains what this entity IS and WHY it matters in the context of software/system design.\n\nEntity name: ";
288
289const BODY_ENRICH_PROMPT_PREFIX: &str = "You are a knowledge assistant. Given a short or sparse memory body, expand it into a richer, more complete and useful description. Preserve all existing facts. Add context, implications, and relationships that would be valuable for knowledge retrieval.\n\nConstraints:\n- Output only the enriched body text (no metadata, no headers)\n- Preserve the original meaning exactly\n- Target length is provided in the system context\n\nMemory body to enrich:\n\n";
290
291#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum, Serialize, Deserialize)]
297#[serde(rename_all = "kebab-case")]
298pub enum EnrichOperation {
299 MemoryBindings,
301 EntityDescriptions,
303 BodyEnrich,
305 ReEmbed,
307 WeightCalibrate,
309 RelationReclassify,
311 EntityConnect,
313 EntityTypeValidate,
315 DescriptionEnrich,
317 CrossDomainBridges,
319 DomainClassify,
321 GraphAudit,
323 DeepResearchSynth,
325 BodyExtract,
327}
328
329#[derive(Debug, Clone, PartialEq, Eq, clap::ValueEnum)]
331pub enum EnrichMode {
332 ClaudeCode,
334 Codex,
336 #[value(name = "opencode")]
338 Opencode,
339 #[value(name = "openrouter")]
341 OpenRouter,
342}
343
344impl std::fmt::Display for EnrichMode {
345 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346 match self {
347 EnrichMode::ClaudeCode => write!(f, "claude-code"),
348 EnrichMode::Codex => write!(f, "codex"),
349 EnrichMode::Opencode => write!(f, "opencode"),
350 EnrichMode::OpenRouter => write!(f, "openrouter"),
351 }
352 }
353}
354
355#[derive(clap::Args)]
357#[command(
358 about = "Enrich graph memories and entities using an LLM provider",
359 after_long_help = "EXAMPLES:\n \
360 # Add missing entity bindings to all unbound memories\n \
361 sqlite-graphrag enrich --operation memory-bindings --mode codex --codex-model gpt-5.4-mini\n\n \
362 # Fill entity descriptions (dry-run preview, no tokens spent)\n \
363 sqlite-graphrag enrich --operation entity-descriptions --dry-run --json\n\n \
364 # Expand short memory bodies (GAP-18)\n \
365 sqlite-graphrag enrich --operation body-enrich --min-output-chars 600\n\n \
366 # Rebuild only missing memory embeddings without rewriting bodies\n \
367 sqlite-graphrag enrich --operation re-embed --limit 100\n\n \
368 # Resume an interrupted body-enrich run\n \
369 sqlite-graphrag enrich --operation body-enrich --resume --json\n\n \
370 # Retry only failed items from a previous run\n \
371 sqlite-graphrag enrich --operation memory-bindings --retry-failed --json\n\n\
372 EXIT CODES:\n \
373 0 success\n \
374 1 validation error (bad args, binary not found)\n \
375 14 I/O error"
376)]
377pub struct EnrichArgs {
378 #[arg(long, short = 'o', value_enum, value_name = "OPERATION")]
380 pub operation: EnrichOperation,
381
382 #[arg(long, value_enum)]
384 pub mode: EnrichMode,
385
386 #[arg(long, value_name = "N")]
388 pub limit: Option<usize>,
389
390 #[arg(long)]
392 pub dry_run: bool,
393
394 #[arg(long, env = "SQLITE_GRAPHRAG_NAMESPACE")]
396 pub namespace: Option<String>,
397
398 #[arg(long, value_name = "PATH")]
401 pub claude_binary: Option<PathBuf>,
402
403 #[arg(long, value_name = "MODEL")]
405 pub claude_model: Option<String>,
406
407 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
409 pub claude_timeout: u64,
410
411 #[arg(long, value_name = "PATH")]
414 pub codex_binary: Option<PathBuf>,
415
416 #[arg(long, value_name = "MODEL")]
418 pub codex_model: Option<String>,
419
420 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
422 pub codex_timeout: u64,
423
424 #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
427 pub opencode_binary: Option<PathBuf>,
428
429 #[arg(long, value_name = "MODEL", env = "SQLITE_GRAPHRAG_OPENCODE_MODEL")]
431 pub opencode_model: Option<String>,
432
433 #[arg(
435 long,
436 value_name = "SECONDS",
437 env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
438 default_value_t = 300
439 )]
440 pub opencode_timeout: u64,
441
442 #[arg(long, value_name = "MODEL")]
445 pub openrouter_model: Option<String>,
446
447 #[arg(long, value_name = "KEY", env = "OPENROUTER_API_KEY")]
449 pub openrouter_api_key: Option<String>,
450
451 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
453 pub openrouter_timeout: u64,
454
455 #[arg(long, value_name = "URL")]
457 pub openrouter_base_url: Option<String>,
458
459 #[arg(long, value_name = "USD")]
462 pub max_cost_usd: Option<f64>,
463
464 #[arg(long)]
467 pub resume: bool,
468
469 #[arg(long)]
471 pub retry_failed: bool,
472
473 #[arg(long)]
477 pub until_empty: bool,
478
479 #[arg(long, value_name = "SECONDS")]
482 pub max_runtime: Option<u64>,
483
484 #[arg(long, value_name = "N", default_value_t = 5, value_parser = clap::value_parser!(u32).range(1..=20))]
487 pub max_attempts: u32,
488
489 #[arg(long)]
492 pub status: bool,
493
494 #[arg(long, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=16))]
497 pub rest_concurrency: Option<u32>,
498
499 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MIN_CHARS)]
502 pub min_output_chars: usize,
503
504 #[arg(long, value_name = "CHARS", default_value_t = DEFAULT_BODY_ENRICH_MAX_CHARS)]
506 pub max_output_chars: usize,
507
508 #[arg(long, default_value_t = true)]
510 pub preserve_check: bool,
511
512 #[arg(long, value_name = "PATH")]
514 pub prompt_template: Option<PathBuf>,
515
516 #[arg(long, default_value_t = 1, value_name = "N", value_parser = clap::value_parser!(u32).range(1..=32))]
520 pub llm_parallelism: u32,
521
522 #[arg(long)]
525 pub json: bool,
526
527 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
529 pub db: Option<String>,
530
531 #[arg(long, value_name = "SECONDS")]
534 pub wait_job_singleton: Option<u64>,
535
536 #[arg(long, default_value_t = false)]
540 pub force_job_singleton: bool,
541
542 #[arg(long, value_name = "NAMES", value_delimiter = ',')]
546 pub names: Vec<String>,
547
548 #[arg(long, value_name = "PATH")]
552 pub names_file: Option<PathBuf>,
553
554 #[arg(long, default_value_t = false)]
558 pub preflight_check: bool,
559
560 #[arg(long, value_enum)]
564 pub fallback_mode: Option<EnrichMode>,
565
566 #[arg(long, value_name = "SECONDS", default_value_t = 300)]
569 pub rate_limit_buffer: u64,
570
571 #[arg(long, default_value_t = true)]
575 pub max_load_check: bool,
576
577 #[arg(long, value_name = "N", default_value_t = 5)]
580 pub circuit_breaker_threshold: u32,
581
582 #[arg(long, value_name = "FLOAT", default_value_t = 0.7)]
589 pub preserve_threshold: f64,
590
591 #[arg(long, default_value_t = true)]
596 pub codex_model_validate: bool,
597
598 #[arg(long, value_name = "MODEL")]
603 pub codex_model_fallback: Option<String>,
604}
605
606#[derive(Debug, Serialize)]
615struct PhaseEvent<'a> {
616 phase: &'a str,
617 #[serde(skip_serializing_if = "Option::is_none")]
618 binary_path: Option<&'a str>,
619 #[serde(skip_serializing_if = "Option::is_none")]
620 version: Option<&'a str>,
621 #[serde(skip_serializing_if = "Option::is_none")]
622 items_total: Option<usize>,
623 #[serde(skip_serializing_if = "Option::is_none")]
624 items_pending: Option<usize>,
625 #[serde(skip_serializing_if = "Option::is_none")]
627 llm_parallelism: Option<u32>,
628}
629
630#[derive(Debug, Serialize)]
631struct ItemEvent<'a> {
632 item: &'a str,
634 status: &'a str,
635 #[serde(skip_serializing_if = "Option::is_none")]
636 memory_id: Option<i64>,
637 #[serde(skip_serializing_if = "Option::is_none")]
638 entity_id: Option<i64>,
639 #[serde(skip_serializing_if = "Option::is_none")]
640 entities: Option<usize>,
641 #[serde(skip_serializing_if = "Option::is_none")]
642 rels: Option<usize>,
643 #[serde(skip_serializing_if = "Option::is_none")]
644 chars_before: Option<usize>,
645 #[serde(skip_serializing_if = "Option::is_none")]
646 chars_after: Option<usize>,
647 #[serde(skip_serializing_if = "Option::is_none")]
648 cost_usd: Option<f64>,
649 #[serde(skip_serializing_if = "Option::is_none")]
650 elapsed_ms: Option<u64>,
651 #[serde(skip_serializing_if = "Option::is_none")]
652 error: Option<String>,
653 index: usize,
654 total: usize,
655}
656
657#[derive(Debug, Serialize)]
658struct EnrichSummary {
659 summary: bool,
660 operation: String,
661 items_total: usize,
662 completed: usize,
663 failed: usize,
664 skipped: usize,
665 cost_usd: f64,
666 elapsed_ms: u64,
667 #[serde(skip_serializing_if = "Option::is_none")]
672 backend_invoked: Option<&'static str>,
673}
674
675use crate::output::emit_json_line as emit_json;
676
677fn open_queue_db(path: &str) -> Result<Connection, AppError> {
692 let conn = Connection::open(path)?;
693 conn.pragma_update(None, "journal_mode", "wal")?;
694 conn.execute_batch(
695 "CREATE TABLE IF NOT EXISTS queue (
696 id INTEGER PRIMARY KEY AUTOINCREMENT,
697 item_key TEXT NOT NULL UNIQUE,
698 item_type TEXT NOT NULL DEFAULT 'memory',
699 status TEXT NOT NULL DEFAULT 'pending',
700 memory_id INTEGER,
701 entity_id INTEGER,
702 entities INTEGER DEFAULT 0,
703 rels INTEGER DEFAULT 0,
704 error TEXT,
705 cost_usd REAL DEFAULT 0.0,
706 attempt INTEGER DEFAULT 0,
707 elapsed_ms INTEGER,
708 created_at TEXT DEFAULT (datetime('now')),
709 done_at TEXT
710 );
711 CREATE INDEX IF NOT EXISTS idx_enrich_queue_status ON queue(status);",
712 )?;
713 let mut has_error_class = false;
717 let mut has_next_retry_at = false;
718 {
719 let mut stmt = conn.prepare("PRAGMA table_info(queue)")?;
720 let names = stmt.query_map([], |r| r.get::<_, String>(1))?;
721 for name in names {
722 match name?.as_str() {
723 "error_class" => has_error_class = true,
724 "next_retry_at" => has_next_retry_at = true,
725 _ => {}
726 }
727 }
728 }
729 if !has_error_class {
730 conn.execute_batch("ALTER TABLE queue ADD COLUMN error_class TEXT")?;
731 }
732 if !has_next_retry_at {
733 conn.execute_batch("ALTER TABLE queue ADD COLUMN next_retry_at TEXT")?;
734 }
735 conn.execute_batch(
736 "CREATE INDEX IF NOT EXISTS idx_enrich_queue_eligible ON queue(status, next_retry_at)",
737 )?;
738 Ok(conn)
739}
740
741#[derive(Debug, Serialize, schemars::JsonSchema)]
747pub struct EnrichStatus {
748 status_report: bool,
749 operation: String,
750 namespace: String,
751 unbound_backlog: usize,
752 queue_pending: i64,
753 queue_processing: i64,
754 queue_done: i64,
755 queue_failed: i64,
756 queue_skipped: i64,
757 queue_dead: i64,
758 eligible_now: i64,
759 waiting: i64,
760}
761
762fn classify_enrich_outcome(e: &AppError) -> crate::retry::AttemptOutcome {
769 use crate::retry::AttemptOutcome;
770 match e {
771 AppError::RateLimited { .. } | AppError::Timeout { .. } | AppError::DbBusy(_) => {
772 AttemptOutcome::Transient
773 }
774 _ => {
775 let msg = format!("{e}").to_lowercase();
776 if msg.contains("server error")
777 || msg.contains("timed out")
778 || msg.contains("timeout")
779 || msg.contains("connection")
780 || msg.contains("5xx")
781 || msg.contains("502")
782 || msg.contains("503")
783 || msg.contains("504")
784 {
785 AttemptOutcome::Transient
786 } else {
787 AttemptOutcome::HardFailure
788 }
789 }
790 }
791}
792
793fn record_item_failure(
801 queue_conn: &rusqlite::Connection,
802 queue_id: i64,
803 attempt: i64,
804 max_attempts: u32,
805 err: &AppError,
806) -> crate::retry::AttemptOutcome {
807 use crate::retry::AttemptOutcome;
808 let outcome = classify_enrich_outcome(err);
809 let err_str = format!("{err}");
810 let error_class = match outcome {
811 AttemptOutcome::Transient => "transient",
812 AttemptOutcome::HardFailure => "permanent",
813 AttemptOutcome::Success => "success",
814 };
815
816 let terminal = matches!(outcome, AttemptOutcome::HardFailure) || attempt >= max_attempts as i64;
817 if terminal {
818 let _ = queue_conn.execute(
819 "UPDATE queue SET status='dead', error=?1, error_class=?2, done_at=datetime('now') WHERE id=?3",
820 rusqlite::params![err_str, error_class, queue_id],
821 );
822 } else {
823 let delay = crate::retry::compute_delay(
824 &crate::retry::RetryConfig::llm_rate_limit(),
825 attempt.max(0) as u32,
826 );
827 let secs = delay.as_secs().max(1);
828 let modifier = format!("+{secs} seconds");
829 let _ = queue_conn.execute(
830 "UPDATE queue SET status='pending', error=?1, error_class=?2, next_retry_at=datetime('now', ?3) WHERE id=?4",
831 rusqlite::params![err_str, error_class, modifier, queue_id],
832 );
833 }
834 outcome
835}
836
837fn call_claude(
845 binary: &Path,
846 prompt: &str,
847 json_schema: &str,
848 input_text: &str,
849 model: Option<&str>,
850 timeout_secs: u64,
851) -> Result<(serde_json::Value, f64, bool), AppError> {
852 let result = crate::commands::claude_runner::run_claude(
853 binary,
854 prompt,
855 json_schema,
856 input_text,
857 model,
858 timeout_secs,
859 7,
860 )?;
861 Ok((result.value, result.cost_usd, result.is_oauth))
862}
863
864fn call_openrouter(
871 prompt: &str,
872 json_schema: &str,
873 input_text: &str,
874 model: Option<&str>,
875 timeout_secs: u64,
876) -> Result<(serde_json::Value, f64, bool), AppError> {
877 let _ = (model, timeout_secs);
881 let client = crate::embedder::openrouter_chat_client().ok_or_else(|| {
882 AppError::Validation(
883 "OpenRouter chat client not initialised before dispatch (internal error)".into(),
884 )
885 })?;
886 let runtime = crate::embedder::shared_runtime()?;
887 runtime.block_on(client.complete(prompt, input_text, json_schema, None))
888}
889
890enum PreflightOutcome {
896 Healthy,
898 RateLimited {
902 reason: String,
903 suggestion: &'static str,
904 },
905 Error(AppError),
907}
908
909fn run_preflight_probe(args: &EnrichArgs) -> PreflightOutcome {
917 let timeout = std::time::Duration::from_secs(args.rate_limit_buffer.max(60));
918
919 match args.mode {
920 EnrichMode::ClaudeCode => {
921 let bin = match find_claude_binary(args.claude_binary.as_deref()) {
922 Ok(b) => b,
923 Err(e) => return PreflightOutcome::Error(e),
924 };
925 let mcp_config_path = match crate::spawn::preflight::write_empty_mcp_config_tempfile() {
930 Ok(p) => p,
931 Err(e) => {
932 return PreflightOutcome::Error(AppError::Io(e));
933 }
934 };
935 let mut cmd = std::process::Command::new(&bin);
936 crate::spawn::env_whitelist::apply_env_whitelist(
937 &mut cmd,
938 crate::spawn::env_whitelist::is_strict_env_clear(),
939 );
940 if let Err(e) = crate::spawn::apply_cwd_isolation(&mut cmd) {
941 return PreflightOutcome::Error(e);
942 }
943 cmd.arg("-p")
944 .arg("ping")
945 .arg("--max-turns")
946 .arg("1")
947 .arg("--strict-mcp-config")
948 .arg("--mcp-config")
949 .arg(mcp_config_path.as_os_str())
950 .arg("--dangerously-skip-permissions")
951 .arg("--settings")
952 .arg("{\"hooks\":{}}")
953 .arg("--output-format")
954 .arg("json")
955 .stdin(std::process::Stdio::null())
956 .stdout(std::process::Stdio::piped())
957 .stderr(std::process::Stdio::piped());
958
959 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
960 Ok(c) => c,
961 Err(e) => {
962 return PreflightOutcome::Error(AppError::Io(e));
963 }
964 };
965 let output = match wait_with_timeout(child, timeout) {
966 Ok(out) => out,
967 Err(e) => return PreflightOutcome::Error(e),
968 };
969 if !output.status.success() {
970 let stderr = String::from_utf8_lossy(&output.stderr);
971 if stderr.contains("hit your session limit")
972 || stderr.contains("rate_limit")
973 || stderr.contains("429")
974 {
975 return PreflightOutcome::RateLimited {
976 reason: stderr.trim().to_string(),
977 suggestion:
978 "wait for the OAuth window to reset or use --fallback-mode codex",
979 };
980 }
981 return PreflightOutcome::Error(AppError::Validation(format!(
982 "preflight probe failed: {stderr}",
983 stderr = stderr.trim()
984 )));
985 }
986 PreflightOutcome::Healthy
987 }
988 EnrichMode::Codex => {
989 let bin = match find_codex_binary(args.codex_binary.as_deref()) {
990 Ok(b) => b,
991 Err(e) => return PreflightOutcome::Error(e),
992 };
993 super::codex_spawn::validate_codex_model(args.codex_model.as_deref())
994 .map_err(PreflightOutcome::Error)
995 .ok();
996 let schema = "{}";
997 let schema_path = match super::codex_spawn::trusted_schema_path() {
998 Ok(p) => p,
999 Err(e) => return PreflightOutcome::Error(e),
1000 };
1001 let spawn_args = super::codex_spawn::CodexSpawnArgs {
1002 binary: &bin,
1003 prompt: "ping",
1004 json_schema: schema,
1005 input_text: "",
1006 model: args.codex_model.as_deref(),
1007 timeout_secs: args.rate_limit_buffer.max(60),
1008 schema_path: schema_path.clone(),
1009 };
1010 let mut cmd = match super::codex_spawn::build_codex_command(&spawn_args) {
1011 Ok(c) => c,
1012 Err(e) => return PreflightOutcome::Error(e),
1013 };
1014 let child = match super::claude_runner::spawn_with_memory_limit(&mut cmd) {
1015 Ok(c) => c,
1016 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
1017 };
1018 let output = match wait_with_timeout(child, timeout) {
1019 Ok(out) => out,
1020 Err(e) => return PreflightOutcome::Error(e),
1021 };
1022 let _ = std::fs::remove_file(&schema_path);
1023 if !output.status.success() {
1024 let stderr = String::from_utf8_lossy(&output.stderr);
1025 if stderr.contains("rate_limit")
1026 || stderr.contains("429")
1027 || stderr.contains("Too Many Requests")
1028 {
1029 return PreflightOutcome::RateLimited {
1030 reason: stderr.trim().to_string(),
1031 suggestion: "wait for the rate-limit window to reset",
1032 };
1033 }
1034 return PreflightOutcome::Error(AppError::Validation(format!(
1035 "preflight probe failed: {stderr}",
1036 stderr = stderr.trim()
1037 )));
1038 }
1039 PreflightOutcome::Healthy
1040 }
1041 EnrichMode::Opencode => {
1042 let bin = match super::opencode_runner::find_opencode_binary_with_override(
1043 args.opencode_binary.as_deref(),
1044 ) {
1045 Ok(b) => b,
1046 Err(e) => return PreflightOutcome::Error(e),
1047 };
1048 let model =
1049 super::opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
1050 let mut cmd =
1051 match super::opencode_runner::build_opencode_command_sync(&bin, &model, "ping", "")
1052 {
1053 Ok(c) => c,
1054 Err(e) => return PreflightOutcome::Error(e),
1055 };
1056 let child = match super::opencode_runner::spawn_opencode(&mut cmd) {
1057 Ok(c) => c,
1058 Err(e) => return PreflightOutcome::Error(AppError::Io(e)),
1059 };
1060 let output = match wait_with_timeout(child, timeout) {
1061 Ok(out) => out,
1062 Err(e) => return PreflightOutcome::Error(e),
1063 };
1064 if !output.status.success() {
1065 let stderr = String::from_utf8_lossy(&output.stderr);
1066 if stderr.contains("rate_limit")
1067 || stderr.contains("429")
1068 || stderr.contains("Too Many Requests")
1069 {
1070 return PreflightOutcome::RateLimited {
1071 reason: stderr.trim().to_string(),
1072 suggestion: "wait for the rate-limit window to reset",
1073 };
1074 }
1075 return PreflightOutcome::Error(AppError::Validation(format!(
1076 "preflight probe failed: {stderr}",
1077 stderr = stderr.trim()
1078 )));
1079 }
1080 PreflightOutcome::Healthy
1081 }
1082 EnrichMode::OpenRouter => {
1083 match crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref()) {
1087 Some(_) => PreflightOutcome::Healthy,
1088 None => PreflightOutcome::Error(AppError::Validation(
1089 "OPENROUTER_API_KEY not found for --mode openrouter preflight".into(),
1090 )),
1091 }
1092 }
1093 }
1094}
1095
1096fn wait_with_timeout(
1098 mut child: std::process::Child,
1099 timeout: std::time::Duration,
1100) -> Result<std::process::Output, AppError> {
1101 use wait_timeout::ChildExt;
1102 let start = std::time::Instant::now();
1103 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
1104 if status.is_none() {
1105 let _ = child.kill();
1106 let _ = child.wait();
1107 return Err(AppError::Validation(format!(
1108 "preflight probe timed out after {}s",
1109 start.elapsed().as_secs()
1110 )));
1111 }
1112 let mut stdout = Vec::new();
1113 if let Some(mut out) = child.stdout.take() {
1114 std::io::Read::read_to_end(&mut out, &mut stdout).map_err(AppError::Io)?;
1115 }
1116 let mut stderr = Vec::new();
1117 if let Some(mut err) = child.stderr.take() {
1118 std::io::Read::read_to_end(&mut err, &mut stderr).map_err(AppError::Io)?;
1119 }
1120 let exit = status.unwrap();
1121 Ok(std::process::Output {
1122 status: exit,
1123 stdout,
1124 stderr,
1125 })
1126}
1127
1128fn scan_unbound_memories(
1139 conn: &Connection,
1140 namespace: &str,
1141 limit: Option<usize>,
1142 name_filter: &[String],
1143) -> Result<Vec<(i64, String, String)>, AppError> {
1144 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1145
1146 if name_filter.is_empty() {
1147 let sql = format!(
1148 "SELECT m.id, m.name, m.body
1149 FROM memories m
1150 WHERE m.namespace = ?1
1151 AND m.deleted_at IS NULL
1152 AND NOT EXISTS (
1153 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
1154 )
1155 ORDER BY m.id
1156 {limit_clause}"
1157 );
1158 let mut stmt = conn.prepare(&sql)?;
1159 let rows = stmt
1160 .query_map(rusqlite::params![namespace], |r| {
1161 Ok((
1162 r.get::<_, i64>(0)?,
1163 r.get::<_, String>(1)?,
1164 r.get::<_, String>(2)?,
1165 ))
1166 })?
1167 .collect::<Result<Vec<_>, _>>()?;
1168 Ok(rows)
1169 } else {
1170 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1172 .map(|i| format!("?{i}"))
1173 .collect();
1174 let in_clause = placeholders.join(", ");
1175 let sql = format!(
1176 "SELECT m.id, m.name, m.body
1177 FROM memories m
1178 WHERE m.namespace = ?1
1179 AND m.deleted_at IS NULL
1180 AND m.name IN ({in_clause})
1181 AND NOT EXISTS (
1182 SELECT 1 FROM memory_entities me WHERE me.memory_id = m.id
1183 )
1184 ORDER BY m.id
1185 {limit_clause}"
1186 );
1187 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1188 params_vec.push(&namespace);
1189 for n in name_filter {
1190 params_vec.push(n);
1191 }
1192 let mut stmt = conn.prepare(&sql)?;
1193 let rows = stmt
1194 .query_map(
1195 rusqlite::params_from_iter(params_vec.iter().copied()),
1196 |r| {
1197 Ok((
1198 r.get::<_, i64>(0)?,
1199 r.get::<_, String>(1)?,
1200 r.get::<_, String>(2)?,
1201 ))
1202 },
1203 )?
1204 .collect::<Result<Vec<_>, _>>()?;
1205 Ok(rows)
1206 }
1207}
1208
1209fn read_names_file(path: &Path) -> Result<Vec<String>, AppError> {
1214 let content = std::fs::read_to_string(path).map_err(|e| {
1215 AppError::Validation(format!("failed to read names file {}: {e}", path.display()))
1216 })?;
1217 let mut seen = std::collections::HashSet::new();
1218 let mut out = Vec::new();
1219 for line in content.lines() {
1220 let trimmed = line.trim();
1221 if trimmed.is_empty() || trimmed.starts_with('#') {
1222 continue;
1223 }
1224 if seen.insert(trimmed.to_string()) {
1225 out.push(trimmed.to_string());
1226 }
1227 }
1228 Ok(out)
1229}
1230
1231fn resolve_name_filter(args: &EnrichArgs) -> Result<Vec<String>, AppError> {
1233 let mut combined: Vec<String> = args.names.clone();
1234 if let Some(p) = &args.names_file {
1235 let from_file = read_names_file(p)?;
1236 for n in from_file {
1237 if !combined.contains(&n) {
1238 combined.push(n);
1239 }
1240 }
1241 }
1242 Ok(combined)
1243}
1244
1245fn scan_entities_without_description(
1249 conn: &Connection,
1250 namespace: &str,
1251 limit: Option<usize>,
1252 name_filter: &[String],
1253) -> Result<Vec<(i64, String, String)>, AppError> {
1254 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1255
1256 if name_filter.is_empty() {
1257 let sql = format!(
1258 "SELECT id, name, type
1259 FROM entities
1260 WHERE namespace = ?1
1261 AND (description IS NULL OR description = '')
1262 ORDER BY id
1263 {limit_clause}"
1264 );
1265 let mut stmt = conn.prepare(&sql)?;
1266 let rows = stmt
1267 .query_map(rusqlite::params![namespace], |r| {
1268 Ok((
1269 r.get::<_, i64>(0)?,
1270 r.get::<_, String>(1)?,
1271 r.get::<_, String>(2)?,
1272 ))
1273 })?
1274 .collect::<Result<Vec<_>, _>>()?;
1275 Ok(rows)
1276 } else {
1277 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1278 .map(|i| format!("?{i}"))
1279 .collect();
1280 let in_clause = placeholders.join(", ");
1281 let sql = format!(
1282 "SELECT id, name, type
1283 FROM entities
1284 WHERE namespace = ?1
1285 AND name IN ({in_clause})
1286 AND (description IS NULL OR description = '')
1287 ORDER BY id
1288 {limit_clause}"
1289 );
1290 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1291 params_vec.push(&namespace);
1292 for n in name_filter {
1293 params_vec.push(n);
1294 }
1295 let mut stmt = conn.prepare(&sql)?;
1296 let rows = stmt
1297 .query_map(
1298 rusqlite::params_from_iter(params_vec.iter().copied()),
1299 |r| {
1300 Ok((
1301 r.get::<_, i64>(0)?,
1302 r.get::<_, String>(1)?,
1303 r.get::<_, String>(2)?,
1304 ))
1305 },
1306 )?
1307 .collect::<Result<Vec<_>, _>>()?;
1308 Ok(rows)
1309 }
1310}
1311
1312fn scan_short_body_memories(
1316 conn: &Connection,
1317 namespace: &str,
1318 min_chars: usize,
1319 limit: Option<usize>,
1320 name_filter: &[String],
1321) -> Result<Vec<(i64, String, String)>, AppError> {
1322 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1323
1324 if name_filter.is_empty() {
1325 let sql = format!(
1326 "SELECT m.id, m.name, m.body
1327 FROM memories m
1328 WHERE m.namespace = ?1
1329 AND m.deleted_at IS NULL
1330 AND LENGTH(COALESCE(m.body,'')) < ?2
1331 ORDER BY m.id
1332 {limit_clause}"
1333 );
1334 let mut stmt = conn.prepare(&sql)?;
1335 let rows = stmt
1336 .query_map(rusqlite::params![namespace, min_chars as i64], |r| {
1337 Ok((
1338 r.get::<_, i64>(0)?,
1339 r.get::<_, String>(1)?,
1340 r.get::<_, String>(2)?,
1341 ))
1342 })?
1343 .collect::<Result<Vec<_>, _>>()?;
1344 Ok(rows)
1345 } else {
1346 let placeholders: Vec<String> = (3..=name_filter.len() + 2)
1347 .map(|i| format!("?{i}"))
1348 .collect();
1349 let in_clause = placeholders.join(", ");
1350 let sql = format!(
1351 "SELECT m.id, m.name, m.body
1352 FROM memories m
1353 WHERE m.namespace = ?1
1354 AND m.deleted_at IS NULL
1355 AND m.name IN ({in_clause})
1356 AND LENGTH(COALESCE(m.body,'')) < ?2
1357 ORDER BY m.id
1358 {limit_clause}"
1359 );
1360 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(2 + name_filter.len());
1361 let min_chars_i64 = min_chars as i64;
1362 params_vec.push(&namespace);
1363 params_vec.push(&min_chars_i64);
1364 for n in name_filter {
1365 params_vec.push(n);
1366 }
1367 let mut stmt = conn.prepare(&sql)?;
1368 let rows = stmt
1369 .query_map(
1370 rusqlite::params_from_iter(params_vec.iter().copied()),
1371 |r| {
1372 Ok((
1373 r.get::<_, i64>(0)?,
1374 r.get::<_, String>(1)?,
1375 r.get::<_, String>(2)?,
1376 ))
1377 },
1378 )?
1379 .collect::<Result<Vec<_>, _>>()?;
1380 Ok(rows)
1381 }
1382}
1383
1384fn scan_memories_without_embeddings(
1388 conn: &Connection,
1389 namespace: &str,
1390 limit: Option<usize>,
1391 name_filter: &[String],
1392) -> Result<Vec<(i64, String, String)>, AppError> {
1393 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1394
1395 if name_filter.is_empty() {
1396 let sql = format!(
1397 "SELECT m.id, m.name, COALESCE(m.body,'')
1398 FROM memories m
1399 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1400 WHERE m.namespace = ?1
1401 AND m.deleted_at IS NULL
1402 AND me.memory_id IS NULL
1403 ORDER BY m.id
1404 {limit_clause}"
1405 );
1406 let mut stmt = conn.prepare(&sql)?;
1407 let rows = stmt
1408 .query_map(rusqlite::params![namespace], |r| {
1409 Ok((
1410 r.get::<_, i64>(0)?,
1411 r.get::<_, String>(1)?,
1412 r.get::<_, String>(2)?,
1413 ))
1414 })?
1415 .collect::<Result<Vec<_>, _>>()?;
1416 Ok(rows)
1417 } else {
1418 let placeholders: Vec<String> = (2..=name_filter.len() + 1)
1419 .map(|i| format!("?{i}"))
1420 .collect();
1421 let in_clause = placeholders.join(", ");
1422 let sql = format!(
1423 "SELECT m.id, m.name, COALESCE(m.body,'')
1424 FROM memories m
1425 LEFT JOIN memory_embeddings me ON me.memory_id = m.id
1426 WHERE m.namespace = ?1
1427 AND m.deleted_at IS NULL
1428 AND m.name IN ({in_clause})
1429 AND me.memory_id IS NULL
1430 ORDER BY m.id
1431 {limit_clause}"
1432 );
1433 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::with_capacity(1 + name_filter.len());
1434 params_vec.push(&namespace);
1435 for n in name_filter {
1436 params_vec.push(n);
1437 }
1438 let mut stmt = conn.prepare(&sql)?;
1439 let rows = stmt
1440 .query_map(
1441 rusqlite::params_from_iter(params_vec.iter().copied()),
1442 |r| {
1443 Ok((
1444 r.get::<_, i64>(0)?,
1445 r.get::<_, String>(1)?,
1446 r.get::<_, String>(2)?,
1447 ))
1448 },
1449 )?
1450 .collect::<Result<Vec<_>, _>>()?;
1451 Ok(rows)
1452 }
1453}
1454
1455#[allow(clippy::type_complexity)]
1457fn scan_weight_candidates(
1458 conn: &Connection,
1459 namespace: &str,
1460 limit: Option<usize>,
1461) -> Result<Vec<(i64, String, String, String, f64)>, AppError> {
1462 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1463 let sql = format!(
1464 "SELECT r.id, e1.name, e2.name, r.relation, r.weight \
1465 FROM relationships r \
1466 JOIN entities e1 ON e1.id = r.source_id \
1467 JOIN entities e2 ON e2.id = r.target_id \
1468 WHERE r.weight >= 0.7 AND e1.namespace = ?1 \
1469 ORDER BY r.weight DESC {limit_clause}"
1470 );
1471 let mut stmt = conn.prepare(&sql)?;
1472 let rows = stmt
1473 .query_map(rusqlite::params![namespace], |r| {
1474 Ok((
1475 r.get::<_, i64>(0)?,
1476 r.get::<_, String>(1)?,
1477 r.get::<_, String>(2)?,
1478 r.get::<_, String>(3)?,
1479 r.get::<_, f64>(4)?,
1480 ))
1481 })?
1482 .collect::<Result<Vec<_>, _>>()?;
1483 Ok(rows)
1484}
1485
1486fn scan_generic_relations(
1488 conn: &Connection,
1489 namespace: &str,
1490 limit: Option<usize>,
1491) -> Result<Vec<(i64, String, String, String)>, AppError> {
1492 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
1493 let sql = format!(
1494 "SELECT r.id, e1.name, e2.name, r.relation \
1495 FROM relationships r \
1496 JOIN entities e1 ON e1.id = r.source_id \
1497 JOIN entities e2 ON e2.id = r.target_id \
1498 WHERE r.relation = 'applies_to' AND e1.namespace = ?1 \
1499 ORDER BY r.id {limit_clause}"
1500 );
1501 let mut stmt = conn.prepare(&sql)?;
1502 let rows = stmt
1503 .query_map(rusqlite::params![namespace], |r| {
1504 Ok((
1505 r.get::<_, i64>(0)?,
1506 r.get::<_, String>(1)?,
1507 r.get::<_, String>(2)?,
1508 r.get::<_, String>(3)?,
1509 ))
1510 })?
1511 .collect::<Result<Vec<_>, _>>()?;
1512 Ok(rows)
1513}
1514
1515fn persist_memory_bindings(
1524 conn: &Connection,
1525 namespace: &str,
1526 memory_id: i64,
1527 entities_json: &serde_json::Value,
1528 rels_json: &serde_json::Value,
1529) -> Result<(usize, usize), AppError> {
1530 #[derive(Deserialize)]
1531 struct EntityItem {
1532 name: String,
1533 entity_type: String,
1534 }
1535 #[derive(Deserialize)]
1536 struct RelItem {
1537 source: String,
1538 target: String,
1539 relation: String,
1540 strength: f64,
1541 }
1542
1543 let extracted_entities: Vec<EntityItem> = serde_json::from_value(entities_json.clone())
1544 .map_err(|e| AppError::Validation(format!("failed to parse entities array: {e}")))?;
1545
1546 let extracted_rels: Vec<RelItem> = serde_json::from_value(rels_json.clone())
1547 .map_err(|e| AppError::Validation(format!("failed to parse relationships array: {e}")))?;
1548
1549 let mut ent_count = 0usize;
1550 let mut rel_count = 0usize;
1551
1552 for item in &extracted_entities {
1553 let entity_type = match item.entity_type.parse::<EntityType>() {
1554 Ok(et) => et,
1555 Err(_) => {
1556 tracing::warn!(
1557 target: "enrich",
1558 entity = %item.name,
1559 entity_type = %item.entity_type,
1560 "entity type not recognized, skipping"
1561 );
1562 continue;
1563 }
1564 };
1565 match entities::upsert_entity(
1566 conn,
1567 namespace,
1568 &NewEntity {
1569 name: item.name.clone(),
1570 entity_type,
1571 description: None,
1572 },
1573 ) {
1574 Ok(eid) => {
1575 let _ = entities::link_memory_entity(conn, memory_id, eid);
1576 ent_count += 1;
1577 }
1578 Err(e) => {
1579 tracing::warn!(
1580 target: "enrich",
1581 entity = %item.name,
1582 error = %e,
1583 "entity upsert skipped"
1584 );
1585 }
1586 }
1587 }
1588
1589 for rel in &extracted_rels {
1590 let normalized = crate::parsers::normalize_relation(&rel.relation);
1591 crate::parsers::warn_if_non_canonical(&normalized);
1592
1593 let src_name = crate::parsers::normalize_entity_name(&rel.source);
1596 let tgt_name = crate::parsers::normalize_entity_name(&rel.target);
1597 let src_id = entities::find_entity_id(conn, namespace, &src_name);
1598 let tgt_id = entities::find_entity_id(conn, namespace, &tgt_name);
1599 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1600 let new_rel = NewRelationship {
1601 source: rel.source.clone(),
1602 target: rel.target.clone(),
1603 relation: normalized,
1604 strength: rel.strength,
1605 description: None,
1606 };
1607 if entities::upsert_relationship(conn, namespace, sid, tid, &new_rel).is_ok() {
1608 rel_count += 1;
1609 }
1610 }
1611 }
1612
1613 Ok((ent_count, rel_count))
1614}
1615
1616fn persist_entity_description(
1618 conn: &Connection,
1619 entity_id: i64,
1620 description: &str,
1621) -> Result<(), AppError> {
1622 conn.execute(
1623 "UPDATE entities SET description = ?1, updated_at = unixepoch() WHERE id = ?2",
1624 rusqlite::params![description, entity_id],
1625 )?;
1626 Ok(())
1627}
1628
1629#[allow(clippy::too_many_arguments)]
1635fn reembed_memory_vector(
1636 conn: &Connection,
1637 namespace: &str,
1638 memory_id: i64,
1639 memory_name: &str,
1640 memory_type: &str,
1641 body: &str,
1642 paths: &crate::paths::AppPaths,
1643 llm_backend: crate::cli::LlmBackendChoice,
1644 embedding_backend: crate::cli::EmbeddingBackendChoice,
1645) -> Result<(), AppError> {
1646 let snippet: String = body.chars().take(200).collect();
1647 let (embedding, backend_kind) = crate::embedder::embed_passage_with_embedding_choice(
1653 &paths.models,
1654 body,
1655 embedding_backend,
1656 llm_backend,
1657 )?;
1658 record_enrich_backend(backend_kind.as_str());
1659 memories::upsert_vec(
1660 conn,
1661 memory_id,
1662 namespace,
1663 memory_type,
1664 &embedding,
1665 memory_name,
1666 &snippet,
1667 )?;
1668 Ok(())
1669}
1670
1671fn record_enrich_backend(backend: &'static str) {
1677 if let Ok(mut guard) = ENRICH_LAST_BACKEND.lock() {
1678 *guard = Some(backend);
1679 }
1680}
1681
1682fn take_enrich_backend() -> Option<&'static str> {
1683 ENRICH_LAST_BACKEND.lock().ok().and_then(|mut g| g.take())
1684}
1685
1686static ENRICH_LAST_BACKEND: std::sync::Mutex<Option<&'static str>> = std::sync::Mutex::new(None);
1687
1688#[allow(clippy::too_many_arguments)]
1693fn persist_enriched_body(
1694 conn: &Connection,
1695 namespace: &str,
1696 memory_id: i64,
1697 memory_name: &str,
1698 new_body: &str,
1699 paths: &crate::paths::AppPaths,
1700 llm_backend: crate::cli::LlmBackendChoice,
1701 embedding_backend: crate::cli::EmbeddingBackendChoice,
1702) -> Result<(), AppError> {
1703 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
1705 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
1706 rusqlite::params![memory_id],
1707 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1708 )?;
1709
1710 let memory_type: String = conn.query_row(
1711 "SELECT type FROM memories WHERE id=?1",
1712 rusqlite::params![memory_id],
1713 |r| r.get(0),
1714 )?;
1715
1716 let description: String = conn.query_row(
1717 "SELECT COALESCE(description,'') FROM memories WHERE id=?1",
1718 rusqlite::params![memory_id],
1719 |r| r.get(0),
1720 )?;
1721
1722 let body_hash = blake3::hash(new_body.as_bytes()).to_hex().to_string();
1723
1724 let new_memory = memories::NewMemory {
1725 namespace: namespace.to_string(),
1726 name: memory_name.to_string(),
1727 memory_type: memory_type.clone(),
1728 description: description.clone(),
1729 body: new_body.to_string(),
1730 body_hash,
1731 session_id: None,
1732 source: "agent".to_string(),
1733 metadata: serde_json::json!({
1734 "operation": "body-enrich",
1735 "orig_chars": old_body.chars().count(),
1736 "new_chars": new_body.chars().count(),
1737 }),
1738 };
1739
1740 let next_version = crate::storage::versions::next_version(conn, memory_id)?;
1744 let version_metadata = serde_json::json!({
1745 "operation": "body-enrich",
1746 "orig_chars": old_body.chars().count(),
1747 "new_chars": new_body.chars().count(),
1748 })
1749 .to_string();
1750 crate::storage::versions::insert_version(
1751 conn,
1752 memory_id,
1753 next_version,
1754 memory_name,
1755 &memory_type,
1756 &description,
1757 new_body,
1758 &version_metadata,
1759 Some("enrich"),
1760 "edit",
1761 )?;
1762
1763 memories::update(conn, memory_id, &new_memory, None)?;
1764 memories::sync_fts_after_update(
1765 conn,
1766 memory_id,
1767 &old_name,
1768 &old_desc,
1769 &old_body,
1770 &new_memory.name,
1771 &new_memory.description,
1772 &new_memory.body,
1773 )?;
1774
1775 if let Err(e) = reembed_memory_vector(
1777 conn,
1778 namespace,
1779 memory_id,
1780 memory_name,
1781 &memory_type,
1782 new_body,
1783 paths,
1784 llm_backend,
1785 embedding_backend,
1786 ) {
1787 tracing::warn!(target: "enrich", memory = %memory_name, error = %e, "vec upsert failed after body-enrich");
1788 }
1789
1790 Ok(())
1791}
1792
1793fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1805 value == default
1806}
1807
1808fn validate_mode_conditional_flags_enrich(args: &EnrichArgs) -> Result<(), AppError> {
1823 const DEFAULT_TIMEOUT: u64 = 300;
1824
1825 let mut conflicts: Vec<String> = Vec::new();
1826
1827 match args.mode {
1828 EnrichMode::ClaudeCode => {
1829 if args.codex_binary.is_some() {
1830 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1831 }
1832 if args.codex_model.is_some() {
1833 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1834 }
1835 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1836 conflicts.push(format!(
1837 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1838 args.codex_timeout
1839 ));
1840 }
1841 }
1842 EnrichMode::Codex => {
1843 if args.claude_binary.is_some() {
1844 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1845 }
1846 if args.claude_model.is_some() {
1847 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1848 }
1849 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1850 conflicts.push(format!(
1851 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1852 args.claude_timeout
1853 ));
1854 }
1855 if args.max_cost_usd.is_some() {
1856 conflicts.push(
1857 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription, not the call)"
1858 .to_string(),
1859 );
1860 }
1861 }
1862 EnrichMode::Opencode => {
1863 if args.claude_binary.is_some() {
1864 conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1865 }
1866 if args.claude_model.is_some() {
1867 conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1868 }
1869 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1870 conflicts.push(format!(
1871 "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1872 args.claude_timeout
1873 ));
1874 }
1875 if args.max_cost_usd.is_some() {
1876 conflicts.push(
1877 "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription, not the call)"
1878 .to_string(),
1879 );
1880 }
1881 }
1882 EnrichMode::OpenRouter => {
1883 if args.claude_binary.is_some() {
1884 conflicts.push("--claude-binary is ignored when --mode=openrouter".to_string());
1885 }
1886 if args.claude_model.is_some() {
1887 conflicts.push("--claude-model is ignored when --mode=openrouter".to_string());
1888 }
1889 if args.codex_binary.is_some() {
1890 conflicts.push("--codex-binary is ignored when --mode=openrouter".to_string());
1891 }
1892 if args.codex_model.is_some() {
1893 conflicts.push("--codex-model is ignored when --mode=openrouter".to_string());
1894 }
1895 if args.opencode_binary.is_some() {
1896 conflicts.push("--opencode-binary is ignored when --mode=openrouter".to_string());
1897 }
1898 if args.opencode_model.is_some() {
1899 conflicts.push("--opencode-model is ignored when --mode=openrouter".to_string());
1900 }
1901 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1902 conflicts.push(format!(
1903 "--claude-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1904 args.claude_timeout
1905 ));
1906 }
1907 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1908 conflicts.push(format!(
1909 "--codex-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1910 args.codex_timeout
1911 ));
1912 }
1913 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1914 conflicts.push(format!(
1915 "--opencode-timeout={} is ignored when --mode=openrouter (remove the flag to use the default 300s)",
1916 args.opencode_timeout
1917 ));
1918 }
1919 }
1920 }
1921
1922 if !conflicts.is_empty() {
1923 return Err(AppError::Validation(format!(
1924 "G20: mode-conditional flag conflicts detected for --mode={}:\n - {}",
1925 args.mode,
1926 conflicts.join("\n - ")
1927 )));
1928 }
1929
1930 Ok(())
1931}
1932
1933pub fn run(
1937 args: &EnrichArgs,
1938 llm_backend: crate::cli::LlmBackendChoice,
1939 embedding_backend: crate::cli::EmbeddingBackendChoice,
1940) -> Result<(), AppError> {
1941 validate_mode_conditional_flags_enrich(args)?;
1944
1945 if args.status {
1950 let paths = AppPaths::resolve(args.db.as_deref())?;
1951 ensure_db_ready(&paths)?;
1952 let conn = open_rw(&paths.db)?;
1953 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1954 let unbound_backlog = scan_unbound_memories(&conn, &namespace, None, &[])?.len();
1955 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
1956 let count_status = |st: &str| -> i64 {
1957 queue_conn
1958 .query_row(
1959 "SELECT COUNT(*) FROM queue WHERE status=?1",
1960 rusqlite::params![st],
1961 |r| r.get(0),
1962 )
1963 .unwrap_or(0)
1964 };
1965 let eligible_now: i64 = queue_conn
1966 .query_row(
1967 "SELECT COUNT(*) FROM queue WHERE status='pending' \
1968 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))",
1969 [],
1970 |r| r.get(0),
1971 )
1972 .unwrap_or(0);
1973 let waiting: i64 = queue_conn
1974 .query_row(
1975 "SELECT COUNT(*) FROM queue WHERE status='pending' \
1976 AND next_retry_at IS NOT NULL AND next_retry_at > datetime('now')",
1977 [],
1978 |r| r.get(0),
1979 )
1980 .unwrap_or(0);
1981 emit_json(&EnrichStatus {
1982 status_report: true,
1983 operation: format!("{:?}", args.operation),
1984 namespace: namespace.clone(),
1985 unbound_backlog,
1986 queue_pending: count_status("pending"),
1987 queue_processing: count_status("processing"),
1988 queue_done: count_status("done"),
1989 queue_failed: count_status("failed"),
1990 queue_skipped: count_status("skipped"),
1991 queue_dead: count_status("dead"),
1992 eligible_now,
1993 waiting,
1994 });
1995 return Ok(());
1996 }
1997
1998 if args.mode == EnrichMode::OpenRouter {
2003 let model = args.openrouter_model.as_deref().ok_or_else(|| {
2004 AppError::Validation(
2005 "--mode openrouter requires --openrouter-model (no default model is allowed)"
2006 .into(),
2007 )
2008 })?;
2009 let resolved =
2010 crate::config::resolve_api_key("openrouter", args.openrouter_api_key.as_deref())
2011 .ok_or_else(|| {
2012 AppError::Validation(
2013 "OPENROUTER_API_KEY not found; set the env var, store it via \
2014 `config add-key --provider openrouter`, or pass --openrouter-api-key"
2015 .into(),
2016 )
2017 })?;
2018 crate::embedder::get_openrouter_chat_client(
2019 resolved.value,
2020 model,
2021 args.openrouter_timeout,
2022 )?;
2023 }
2024
2025 let started = Instant::now();
2026
2027 let paths = AppPaths::resolve(args.db.as_deref())?;
2028 ensure_db_ready(&paths)?;
2029 let conn = open_rw(&paths.db)?;
2030 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
2031
2032 let wait_secs = args.wait_job_singleton;
2038 let force_flag = args.force_job_singleton;
2039 let _singleton = crate::lock::acquire_job_singleton(
2040 crate::lock::JobType::Enrich,
2041 &namespace,
2042 &paths.db,
2043 wait_secs,
2044 force_flag,
2045 )?;
2046
2047 let provider_binary = if matches!(args.operation, EnrichOperation::ReEmbed) {
2049 None
2050 } else {
2051 Some(match args.mode {
2052 EnrichMode::ClaudeCode => {
2053 let bin = find_claude_binary(args.claude_binary.as_deref())?;
2054 let version = super::claude_runner::validate_claude_version(&bin)?;
2055 tracing::info!(target: "enrich", binary = %bin.display(), version = %version, "Claude Code binary validated");
2056 emit_json(&PhaseEvent {
2057 phase: "validate",
2058 binary_path: bin.to_str(),
2059 version: Some(&version),
2060 items_total: None,
2061 items_pending: None,
2062 llm_parallelism: None,
2063 });
2064 bin
2065 }
2066 EnrichMode::Codex => {
2067 let bin = find_codex_binary(args.codex_binary.as_deref())?;
2068 emit_json(&PhaseEvent {
2069 phase: "validate",
2070 binary_path: bin.to_str(),
2071 version: None,
2072 items_total: None,
2073 items_pending: None,
2074 llm_parallelism: None,
2075 });
2076 bin
2077 }
2078 EnrichMode::Opencode => {
2079 let bin = super::opencode_runner::find_opencode_binary_with_override(
2080 args.opencode_binary.as_deref(),
2081 )?;
2082 emit_json(&PhaseEvent {
2083 phase: "validate",
2084 binary_path: bin.to_str(),
2085 version: None,
2086 items_total: None,
2087 items_pending: None,
2088 llm_parallelism: None,
2089 });
2090 bin
2091 }
2092 EnrichMode::OpenRouter => {
2093 emit_json(&PhaseEvent {
2098 phase: "validate",
2099 binary_path: None,
2100 version: None,
2101 items_total: None,
2102 items_pending: None,
2103 llm_parallelism: None,
2104 });
2105 PathBuf::new()
2106 }
2107 })
2108 };
2109
2110 if args.max_load_check && !args.dry_run && crate::system_load::is_system_saturated() {
2114 let load = crate::system_load::load_average_one();
2115 let n = crate::system_load::ncpus();
2116 return Err(AppError::Validation(format!(
2117 "system load average {load:.2} exceeds 2x ncpus ({n}); \
2118 pass --no-max-load-check to override (not recommended)"
2119 )));
2120 }
2121
2122 if args.preflight_check && !args.dry_run && !matches!(args.operation, EnrichOperation::ReEmbed)
2129 {
2130 let preflight_result = run_preflight_probe(args);
2131 match preflight_result {
2132 PreflightOutcome::Healthy => {
2133 tracing::info!(target: "enrich", mode = ?args.mode, "preflight probe healthy");
2134 }
2135 PreflightOutcome::RateLimited { reason, suggestion } => {
2136 if let Some(fallback) = args.fallback_mode.clone() {
2137 if fallback != args.mode {
2138 return Err(AppError::Validation(format!(
2148 "preflight detected rate limit on {mode:?}: {reason}; \
2149 re-invoke with `--mode {fallback:?}` to use the fallback provider",
2150 mode = args.mode
2151 )));
2152 }
2153 return Err(AppError::Validation(format!(
2154 "preflight detected rate limit on {mode:?}: {reason}; \
2155 --fallback-mode matches --mode, no recovery possible",
2156 mode = args.mode
2157 )));
2158 }
2159 return Err(AppError::Validation(format!(
2160 "preflight detected rate limit on {mode:?}: {reason}; \
2161 {suggestion}; pass --fallback-mode codex to recover",
2162 mode = args.mode
2163 )));
2164 }
2165 PreflightOutcome::Error(e) => {
2166 return Err(e);
2167 }
2168 }
2169 }
2170
2171 let scan_result = scan_operation(&conn, &namespace, args)?;
2173 let total = scan_result.len();
2174
2175 emit_json(&PhaseEvent {
2176 phase: "scan",
2177 binary_path: None,
2178 version: None,
2179 items_total: Some(total),
2180 items_pending: Some(total),
2181 llm_parallelism: Some(args.llm_parallelism),
2182 });
2183
2184 if args.dry_run {
2186 for (idx, key) in scan_result.iter().enumerate() {
2187 emit_json(&ItemEvent {
2188 item: key,
2189 status: "preview",
2190 memory_id: None,
2191 entity_id: None,
2192 entities: None,
2193 rels: None,
2194 chars_before: None,
2195 chars_after: None,
2196 cost_usd: None,
2197 elapsed_ms: None,
2198 error: None,
2199 index: idx,
2200 total,
2201 });
2202 }
2203 emit_json(&EnrichSummary {
2204 summary: true,
2205 operation: format!("{:?}", args.operation),
2206 items_total: total,
2207 completed: 0,
2208 failed: 0,
2209 skipped: 0,
2210 cost_usd: 0.0,
2211 elapsed_ms: started.elapsed().as_millis() as u64,
2212 backend_invoked: take_enrich_backend(),
2213 });
2214 return Ok(());
2215 }
2216
2217 let queue_conn = open_queue_db(DEFAULT_QUEUE_DB)?;
2221
2222 if args.resume {
2223 let reset = queue_conn
2224 .execute(
2225 "UPDATE queue SET status='pending' WHERE status='processing'",
2226 [],
2227 )
2228 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
2229 if reset > 0 {
2230 tracing::info!(target: "enrich", count = reset, "reset stuck processing items to pending");
2231 }
2232 }
2233
2234 if args.retry_failed {
2235 let count = queue_conn
2236 .execute(
2237 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
2238 [],
2239 )
2240 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
2241 tracing::info!(target: "enrich", count, "retrying failed items");
2242 }
2243
2244 if !args.resume && !args.retry_failed && !args.until_empty {
2245 queue_conn
2246 .execute("DELETE FROM queue", [])
2247 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
2248 }
2249
2250 for (idx, key) in scan_result.iter().enumerate() {
2252 let item_type = match args.operation {
2253 EnrichOperation::EntityDescriptions => "entity",
2254 _ => "memory",
2255 };
2256 if let Err(e) = queue_conn.execute(
2257 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
2258 rusqlite::params![key, item_type],
2259 ) {
2260 tracing::warn!(target: "enrich", error = %e, "queue insert failed");
2261 }
2262 let _ = idx; }
2264
2265 let parallelism = if args.mode == EnrichMode::OpenRouter {
2268 let rest = args.rest_concurrency.unwrap_or(8).clamp(1, 16) as usize;
2269 tracing::info!(
2270 target: "enrich",
2271 concurrency = rest,
2272 source = "rest_concurrency",
2273 "OpenRouter REST concurrency (clamp 1..=16)"
2274 );
2275 rest
2276 } else {
2277 let p = args.llm_parallelism.clamp(1, 32) as usize;
2278 tracing::info!(
2279 target: "enrich",
2280 concurrency = p,
2281 source = "llm_parallelism",
2282 "LLM subprocess parallelism (clamp 1..=32)"
2283 );
2284 p
2285 };
2286 if parallelism > 1 {
2287 tracing::info!(
2288 target: "enrich",
2289 llm_parallelism = parallelism,
2290 "parallel LLM processing with bounded thread pool"
2291 );
2292 }
2293 if parallelism > 4 {
2297 match args.mode {
2298 EnrichMode::ClaudeCode => {
2299 tracing::warn!(
2300 target: "enrich",
2301 llm_parallelism = parallelism,
2302 recommended_max = 4,
2303 mode = "claude-code",
2304 "llm_parallelism above 4 multiplies Claude Code subprocess fan-out; \
2305 consider combining with SQLITE_GRAPHRAG_CLAUDE_EMPTY_CONFIG_DIR \
2306 to cut MCP children (G28-A)"
2307 );
2308 }
2309 EnrichMode::Codex if parallelism > 16 => {
2310 tracing::warn!(
2311 target: "enrich",
2312 llm_parallelism = parallelism,
2313 recommended_max = 16,
2314 mode = "codex",
2315 "llm_parallelism above 16 risks OAuth rate-limit on Codex; \
2316 consider --llm-parallelism 8 for safer concurrency"
2317 );
2318 }
2319 EnrichMode::Codex => {
2320 }
2324 EnrichMode::Opencode if parallelism > 16 => {
2325 tracing::warn!(
2326 target: "enrich",
2327 llm_parallelism = parallelism,
2328 recommended_max = 16,
2329 mode = "opencode",
2330 "llm_parallelism above 16 risks OAuth rate-limit on OpenCode; \
2331 consider --llm-parallelism 8 for safer concurrency"
2332 );
2333 }
2334 EnrichMode::Opencode => {
2335 }
2337 EnrichMode::OpenRouter => {
2338 }
2341 }
2342 }
2343
2344 let mut completed = 0usize;
2345 let mut failed = 0usize;
2346 let mut skipped = 0usize;
2347 let mut cost_total = 0.0f64;
2348 let mut oauth_detected = false;
2349 let mut backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2350 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2351 let enrich_started = std::time::Instant::now();
2352
2353 let provider_timeout = match args.mode {
2354 EnrichMode::ClaudeCode => args.claude_timeout,
2355 EnrichMode::Codex => args.codex_timeout,
2356 EnrichMode::Opencode => args.opencode_timeout,
2357 EnrichMode::OpenRouter => args.openrouter_timeout,
2358 };
2359
2360 let provider_model: Option<&str> = match args.mode {
2361 EnrichMode::ClaudeCode => args.claude_model.as_deref(),
2362 EnrichMode::Codex => args.codex_model.as_deref(),
2363 EnrichMode::Opencode => args.opencode_model.as_deref(),
2364 EnrichMode::OpenRouter => args.openrouter_model.as_deref(),
2365 };
2366
2367 let until_deadline = std::time::Instant::now()
2371 + std::time::Duration::from_secs(args.max_runtime.unwrap_or(3600));
2372 loop {
2373 if args.until_empty {
2374 let rescan = scan_operation(&conn, &namespace, args)?;
2378 let rescan_item_type = match args.operation {
2379 EnrichOperation::EntityDescriptions => "entity",
2380 _ => "memory",
2381 };
2382 for key in &rescan {
2383 let _ = queue_conn.execute(
2384 "INSERT OR IGNORE INTO queue (item_key, item_type, status) VALUES (?1, ?2, 'pending')",
2385 rusqlite::params![key, rescan_item_type],
2386 );
2387 }
2388 }
2389 let completed_before = completed;
2390
2391 if parallelism > 1 {
2395 let stdout_mu = parking_lot::Mutex::new(());
2396 let budget = args.max_cost_usd;
2397 let operation = args.operation.clone();
2398 let mode = args.mode.clone();
2399 let min_oc = args.min_output_chars;
2400 let max_oc = args.max_output_chars;
2401 let prompt_tpl = args.prompt_template.as_deref().map(|p| p.to_path_buf());
2402
2403 struct WorkerResult {
2404 completed: usize,
2405 failed: usize,
2406 skipped: usize,
2407 cost: f64,
2408 oauth: bool,
2409 }
2410
2411 let results: Vec<WorkerResult> = std::thread::scope(|s| {
2412 let handles: Vec<_> = (0..parallelism)
2413 .map(|worker_id| {
2414 let stdout_mu = &stdout_mu;
2415 let paths = &paths;
2416 let namespace = &namespace;
2417 let provider_binary = provider_binary.as_deref();
2418 let operation = &operation;
2419 let mode = &mode;
2420 let prompt_tpl = prompt_tpl.as_deref();
2421 s.spawn(move || {
2422 let w_conn = match open_rw(&paths.db) {
2423 Ok(c) => c,
2424 Err(e) => {
2425 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open DB");
2426 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2427 }
2428 };
2429 let w_queue = match open_queue_db(DEFAULT_QUEUE_DB) {
2430 Ok(c) => c,
2431 Err(e) => {
2432 tracing::error!(target: "enrich", worker = worker_id, error = %e, "worker failed to open queue DB");
2433 return WorkerResult { completed: 0, failed: 0, skipped: 0, cost: 0.0, oauth: false };
2434 }
2435 };
2436 let mut w_completed = 0usize;
2437 let mut w_failed = 0usize;
2438 let mut w_skipped = 0usize;
2439 let mut w_cost = 0.0f64;
2440 let mut w_oauth = false;
2441 let mut w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2442 let w_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
2443 let mut w_breaker = crate::retry::CircuitBreaker::new(
2449 args.circuit_breaker_threshold.max(1),
2450 std::time::Duration::from_secs(60),
2451 );
2452
2453 loop {
2454 if crate::shutdown_requested() {
2455 tracing::info!(target: "enrich", "shutdown requested, worker stopping");
2456 break;
2457 }
2458 if let Some(b) = budget {
2459 if !w_oauth && w_cost >= b {
2460 break;
2461 }
2462 }
2463 let pending: Option<(i64, String, String, i64)> = w_queue
2464 .query_row(
2465 "UPDATE queue SET status='processing', attempt=attempt+1 \
2466 WHERE id = (SELECT id FROM queue WHERE status='pending' \
2467 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
2468 ORDER BY id LIMIT 1) \
2469 RETURNING id, item_key, item_type, attempt",
2470 [],
2471 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
2472 )
2473 .ok();
2474 let (queue_id, item_key, _item_type, attempt_current) = match pending {
2475 Some(p) => p,
2476 None => break,
2477 };
2478 let item_started = Instant::now();
2479 let current_index = w_completed + w_failed + w_skipped;
2480
2481 let call_result = match operation {
2482 EnrichOperation::MemoryBindings => call_memory_bindings(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2483 EnrichOperation::EntityDescriptions => call_entity_description(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2484 EnrichOperation::BodyEnrich => call_body_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode, min_oc, max_oc, prompt_tpl, args.preserve_threshold, paths, llm_backend, embedding_backend),
2485 EnrichOperation::ReEmbed => call_reembed(&w_conn, namespace, &item_key, paths, llm_backend, embedding_backend),
2486 EnrichOperation::WeightCalibrate => call_weight_calibrate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2487 EnrichOperation::RelationReclassify => call_relation_reclassify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2488 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => call_entity_connect(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2489 EnrichOperation::EntityTypeValidate => call_entity_type_validate(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2490 EnrichOperation::DescriptionEnrich => call_description_enrich(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2491 EnrichOperation::DomainClassify => call_domain_classify(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2492 EnrichOperation::GraphAudit => call_graph_audit(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2493 EnrichOperation::DeepResearchSynth => call_deep_research_synth(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2494 EnrichOperation::BodyExtract => call_body_extract(&w_conn, namespace, &item_key, provider_binary.expect("provider binary required"), provider_model, provider_timeout, mode),
2495 };
2496
2497 match call_result {
2498 Ok(EnrichItemResult::Done { cost, is_oauth, memory_id, entity_id, entities, rels, chars_before, chars_after }) => {
2499 if is_oauth { w_oauth = true; }
2500 w_backoff = DEFAULT_RATE_LIMIT_WAIT;
2501 let _ = w_queue.execute(
2502 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2503 rusqlite::params![memory_id, entity_id, entities as i64, rels as i64, cost, item_started.elapsed().as_millis() as i64, queue_id],
2504 );
2505 w_completed += 1;
2506 if !is_oauth { w_cost += cost; }
2507 let _ = w_breaker
2509 .record(crate::retry::AttemptOutcome::Success);
2510 let _guard = stdout_mu.lock();
2511 emit_json(&ItemEvent { item: &item_key, status: "done", memory_id, entity_id, entities: Some(entities), rels: Some(rels), chars_before, chars_after, cost_usd: if is_oauth { None } else { Some(cost) }, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2512 }
2513 Ok(EnrichItemResult::Skipped { reason }) => {
2514 w_skipped += 1;
2515 let _ = w_queue.execute("UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2", rusqlite::params![reason, queue_id]);
2516 let _guard = stdout_mu.lock();
2517 emit_json(&ItemEvent { item: &item_key, status: "skipped", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: None, index: current_index, total });
2518 }
2519 Ok(EnrichItemResult::PreservationFailed { score, threshold, chars_before, chars_after }) => {
2520 w_skipped += 1;
2526 let reason = format!(
2527 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2528 );
2529 let _ = w_queue.execute(
2530 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2531 rusqlite::params![reason, queue_id],
2532 );
2533 let _guard = stdout_mu.lock();
2534 emit_json(&ItemEvent {
2535 item: &item_key,
2536 status: "preservation_failed",
2537 memory_id: None,
2538 entity_id: None,
2539 entities: None,
2540 rels: None,
2541 chars_before: Some(chars_before),
2542 chars_after: Some(chars_after),
2543 cost_usd: None,
2544 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2545 error: Some(reason),
2546 index: current_index,
2547 total,
2548 });
2549 }
2550 Err(e) => {
2551 let err_str = format!("{e}");
2552 if matches!(e, AppError::RateLimited { .. }) {
2553 if crate::retry::is_kill_switch_active() {
2554 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2555 } else if std::time::Instant::now() >= w_deadline {
2556 tracing::error!(target: "enrich", "rate-limit retry deadline (1h) exhausted in worker");
2557 } else {
2558 let half = w_backoff / 2;
2559 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2560 let actual_wait = half + jitter;
2561 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited in worker, backing off");
2562 let _ = w_queue.execute("UPDATE queue SET status='pending' WHERE id=?1", rusqlite::params![queue_id]);
2563 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2564 w_backoff = (w_backoff * 2).min(900);
2565 continue;
2566 }
2567 }
2568 w_failed += 1;
2569 let outcome = record_item_failure(&w_queue, queue_id, attempt_current, args.max_attempts, &e);
2570 let _guard = stdout_mu.lock();
2571 emit_json(&ItemEvent { item: &item_key, status: "failed", memory_id: None, entity_id: None, entities: None, rels: None, chars_before: None, chars_after: None, cost_usd: None, elapsed_ms: Some(item_started.elapsed().as_millis() as u64), error: Some(err_str), index: current_index, total });
2572 let breaker_opened = w_breaker.record(outcome);
2575 if breaker_opened {
2576 tracing::error!(target: "enrich",
2577 consecutive_failures = w_breaker.consecutive_failures(),
2578 "circuit breaker opened — aborting worker"
2579 );
2580 break;
2581 }
2582 }
2583 }
2584 }
2585 WorkerResult { completed: w_completed, failed: w_failed, skipped: w_skipped, cost: w_cost, oauth: w_oauth }
2586 })
2587 })
2588 .collect();
2589 handles
2590 .into_iter()
2591 .map(|h| {
2592 h.join().unwrap_or(WorkerResult {
2593 completed: 0,
2594 failed: 0,
2595 skipped: 0,
2596 cost: 0.0,
2597 oauth: false,
2598 })
2599 })
2600 .collect()
2601 });
2602
2603 for r in &results {
2604 completed += r.completed;
2605 failed += r.failed;
2606 skipped += r.skipped;
2607 cost_total += r.cost;
2608 if r.oauth && !oauth_detected {
2609 oauth_detected = true;
2610 }
2611 }
2612 } else {
2613 loop {
2615 if crate::shutdown_requested() {
2616 tracing::info!(target: "enrich", "shutdown requested, stopping enrichment");
2617 break;
2618 }
2619
2620 if let Some(budget) = args.max_cost_usd {
2622 if !oauth_detected && cost_total >= budget {
2623 tracing::warn!(target: "enrich", spent = cost_total, budget, "budget exceeded, stopping");
2624 break;
2625 }
2626 }
2627
2628 let pending: Option<(i64, String, String, i64)> = queue_conn
2630 .query_row(
2631 "UPDATE queue SET status='processing', attempt=attempt+1 \
2632 WHERE id = (SELECT id FROM queue WHERE status='pending' \
2633 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
2634 ORDER BY id LIMIT 1) \
2635 RETURNING id, item_key, item_type, attempt",
2636 [],
2637 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
2638 )
2639 .ok();
2640
2641 let (queue_id, item_key, item_type, attempt_current) = match pending {
2642 Some(p) => p,
2643 None => break,
2644 };
2645
2646 let item_started = Instant::now();
2647 let current_index = completed + failed + skipped;
2648
2649 let call_result = match args.operation {
2650 EnrichOperation::MemoryBindings => call_memory_bindings(
2651 &conn,
2652 &namespace,
2653 &item_key,
2654 provider_binary
2655 .as_deref()
2656 .expect("provider binary required"),
2657 provider_model,
2658 provider_timeout,
2659 &args.mode,
2660 ),
2661 EnrichOperation::EntityDescriptions => call_entity_description(
2662 &conn,
2663 &namespace,
2664 &item_key,
2665 provider_binary
2666 .as_deref()
2667 .expect("provider binary required"),
2668 provider_model,
2669 provider_timeout,
2670 &args.mode,
2671 ),
2672 EnrichOperation::BodyEnrich => call_body_enrich(
2673 &conn,
2674 &namespace,
2675 &item_key,
2676 provider_binary
2677 .as_deref()
2678 .expect("provider binary required"),
2679 provider_model,
2680 provider_timeout,
2681 &args.mode,
2682 args.min_output_chars,
2683 args.max_output_chars,
2684 args.prompt_template.as_deref(),
2685 args.preserve_threshold,
2686 &paths,
2687 llm_backend,
2688 embedding_backend,
2689 ),
2690 EnrichOperation::ReEmbed => call_reembed(
2691 &conn,
2692 &namespace,
2693 &item_key,
2694 &paths,
2695 llm_backend,
2696 embedding_backend,
2697 ),
2698 EnrichOperation::WeightCalibrate => call_weight_calibrate(
2699 &conn,
2700 &namespace,
2701 &item_key,
2702 provider_binary
2703 .as_deref()
2704 .expect("provider binary required"),
2705 provider_model,
2706 provider_timeout,
2707 &args.mode,
2708 ),
2709 EnrichOperation::RelationReclassify => call_relation_reclassify(
2710 &conn,
2711 &namespace,
2712 &item_key,
2713 provider_binary
2714 .as_deref()
2715 .expect("provider binary required"),
2716 provider_model,
2717 provider_timeout,
2718 &args.mode,
2719 ),
2720 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
2721 call_entity_connect(
2722 &conn,
2723 &namespace,
2724 &item_key,
2725 provider_binary
2726 .as_deref()
2727 .expect("provider binary required"),
2728 provider_model,
2729 provider_timeout,
2730 &args.mode,
2731 )
2732 }
2733 EnrichOperation::EntityTypeValidate => call_entity_type_validate(
2734 &conn,
2735 &namespace,
2736 &item_key,
2737 provider_binary
2738 .as_deref()
2739 .expect("provider binary required"),
2740 provider_model,
2741 provider_timeout,
2742 &args.mode,
2743 ),
2744 EnrichOperation::DescriptionEnrich => call_description_enrich(
2745 &conn,
2746 &namespace,
2747 &item_key,
2748 provider_binary
2749 .as_deref()
2750 .expect("provider binary required"),
2751 provider_model,
2752 provider_timeout,
2753 &args.mode,
2754 ),
2755 EnrichOperation::DomainClassify => call_domain_classify(
2756 &conn,
2757 &namespace,
2758 &item_key,
2759 provider_binary
2760 .as_deref()
2761 .expect("provider binary required"),
2762 provider_model,
2763 provider_timeout,
2764 &args.mode,
2765 ),
2766 EnrichOperation::GraphAudit => call_graph_audit(
2767 &conn,
2768 &namespace,
2769 &item_key,
2770 provider_binary
2771 .as_deref()
2772 .expect("provider binary required"),
2773 provider_model,
2774 provider_timeout,
2775 &args.mode,
2776 ),
2777 EnrichOperation::DeepResearchSynth => call_deep_research_synth(
2778 &conn,
2779 &namespace,
2780 &item_key,
2781 provider_binary
2782 .as_deref()
2783 .expect("provider binary required"),
2784 provider_model,
2785 provider_timeout,
2786 &args.mode,
2787 ),
2788 EnrichOperation::BodyExtract => call_body_extract(
2789 &conn,
2790 &namespace,
2791 &item_key,
2792 provider_binary
2793 .as_deref()
2794 .expect("provider binary required"),
2795 provider_model,
2796 provider_timeout,
2797 &args.mode,
2798 ),
2799 };
2800
2801 match call_result {
2802 Ok(EnrichItemResult::Done {
2803 memory_id,
2804 entity_id,
2805 entities,
2806 rels,
2807 chars_before,
2808 chars_after,
2809 cost,
2810 is_oauth,
2811 }) => {
2812 if is_oauth && !oauth_detected {
2813 oauth_detected = true;
2814 tracing::info!(target: "enrich", "OAuth subscription detected — cost_usd omitted from output");
2815 }
2816 backoff_secs = DEFAULT_RATE_LIMIT_WAIT;
2817
2818 let persist_err: Option<String> = match args.operation {
2820 EnrichOperation::MemoryBindings => {
2821 None
2823 }
2824 EnrichOperation::EntityDescriptions => {
2825 None
2827 }
2828 EnrichOperation::BodyEnrich => {
2829 None
2831 }
2832 _ => {
2833 None
2835 }
2836 };
2837
2838 if let Err(e) = queue_conn.execute(
2839 "UPDATE queue SET status='done', memory_id=?1, entity_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
2840 rusqlite::params![
2841 memory_id,
2842 entity_id,
2843 entities as i64,
2844 rels as i64,
2845 cost,
2846 item_started.elapsed().as_millis() as i64,
2847 queue_id
2848 ],
2849 ) {
2850 tracing::warn!(target: "enrich", error = %e, "queue done update failed");
2851 }
2852
2853 if persist_err.is_none() {
2854 completed += 1;
2855 if !is_oauth {
2856 cost_total += cost;
2857 }
2858 emit_json(&ItemEvent {
2859 item: &item_key,
2860 status: "done",
2861 memory_id,
2862 entity_id,
2863 entities: Some(entities),
2864 rels: Some(rels),
2865 chars_before,
2866 chars_after,
2867 cost_usd: if is_oauth { None } else { Some(cost) },
2868 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2869 error: None,
2870 index: current_index,
2871 total,
2872 });
2873 } else {
2874 failed += 1;
2875 emit_json(&ItemEvent {
2876 item: &item_key,
2877 status: "failed",
2878 memory_id: None,
2879 entity_id: None,
2880 entities: None,
2881 rels: None,
2882 chars_before: None,
2883 chars_after: None,
2884 cost_usd: None,
2885 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2886 error: persist_err,
2887 index: current_index,
2888 total,
2889 });
2890 }
2891 }
2892 Ok(EnrichItemResult::Skipped { reason }) => {
2893 skipped += 1;
2894 if let Err(e) = queue_conn.execute(
2895 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2896 rusqlite::params![reason, queue_id],
2897 ) {
2898 tracing::warn!(target: "enrich", error = %e, "queue skipped update failed");
2899 }
2900 emit_json(&ItemEvent {
2901 item: &item_key,
2902 status: "skipped",
2903 memory_id: None,
2904 entity_id: None,
2905 entities: None,
2906 rels: None,
2907 chars_before: None,
2908 chars_after: None,
2909 cost_usd: None,
2910 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2911 error: None,
2912 index: current_index,
2913 total,
2914 });
2915 }
2916 Ok(EnrichItemResult::PreservationFailed {
2917 score,
2918 threshold,
2919 chars_before,
2920 chars_after,
2921 }) => {
2922 skipped += 1;
2929 let reason = format!(
2930 "preservation_failed: jaccard={score:.3} threshold={threshold:.3} (orig={chars_before} chars, new={chars_after} chars)"
2931 );
2932 if let Err(qe) = queue_conn.execute(
2933 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
2934 rusqlite::params![reason, queue_id],
2935 ) {
2936 tracing::warn!(target: "enrich", error = %qe, "queue preservation_failed update failed");
2937 }
2938 emit_json(&ItemEvent {
2939 item: &item_key,
2940 status: "preservation_failed",
2941 memory_id: None,
2942 entity_id: None,
2943 entities: None,
2944 rels: None,
2945 chars_before: Some(chars_before),
2946 chars_after: Some(chars_after),
2947 cost_usd: None,
2948 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2949 error: Some(reason),
2950 index: current_index,
2951 total,
2952 });
2953 }
2954 Err(e) => {
2955 let err_str = format!("{e}");
2956 if matches!(e, AppError::RateLimited { .. }) {
2957 if crate::retry::is_kill_switch_active() {
2958 tracing::warn!(target: "enrich", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
2959 } else if std::time::Instant::now() >= rate_limit_deadline {
2960 tracing::error!(target: "enrich", total_elapsed_secs = enrich_started.elapsed().as_secs(), "rate-limit retry deadline (1h) exhausted");
2961 } else {
2962 let half = backoff_secs / 2;
2963 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
2964 let actual_wait = half + jitter;
2965 tracing::warn!(target: "enrich", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
2966 if let Err(qe) = queue_conn.execute(
2967 "UPDATE queue SET status='pending' WHERE id=?1",
2968 rusqlite::params![queue_id],
2969 ) {
2970 tracing::warn!(target: "enrich", error = %qe, "queue pending update failed");
2971 }
2972 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
2973 backoff_secs = (backoff_secs * 2).min(900);
2974 continue;
2975 }
2976 }
2977
2978 failed += 1;
2979 let _outcome = record_item_failure(
2980 &queue_conn,
2981 queue_id,
2982 attempt_current,
2983 args.max_attempts,
2984 &e,
2985 );
2986 emit_json(&ItemEvent {
2987 item: &item_key,
2988 status: "failed",
2989 memory_id: None,
2990 entity_id: None,
2991 entities: None,
2992 rels: None,
2993 chars_before: None,
2994 chars_after: None,
2995 cost_usd: None,
2996 elapsed_ms: Some(item_started.elapsed().as_millis() as u64),
2997 error: Some(err_str),
2998 index: current_index,
2999 total,
3000 });
3001 }
3002 }
3003
3004 let _ = item_type; }
3006 } if !args.until_empty {
3009 break;
3010 }
3011 let eligible_remaining: i64 = queue_conn
3012 .query_row(
3013 "SELECT COUNT(*) FROM queue WHERE status='pending' \
3014 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))",
3015 [],
3016 |r| r.get(0),
3017 )
3018 .unwrap_or(0);
3019 let progressed = completed > completed_before;
3020 if std::time::Instant::now() >= until_deadline {
3021 tracing::info!(target: "enrich", "until-empty: max-runtime reached, stopping");
3022 break;
3023 }
3024 if !progressed && eligible_remaining == 0 {
3025 tracing::info!(target: "enrich", "until-empty: converged (no eligible items remain)");
3026 break;
3027 }
3028 if eligible_remaining == 0 {
3029 std::thread::sleep(std::time::Duration::from_secs(1));
3031 }
3032 } let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
3035 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
3036
3037 emit_json(&EnrichSummary {
3038 summary: true,
3039 operation: format!("{:?}", args.operation),
3040 items_total: total,
3041 completed,
3042 failed,
3043 skipped,
3044 cost_usd: cost_total,
3045 elapsed_ms: started.elapsed().as_millis() as u64,
3046 backend_invoked: take_enrich_backend(),
3047 });
3048
3049 if failed == 0 {
3050 let dead: i64 = queue_conn
3053 .query_row("SELECT COUNT(*) FROM queue WHERE status='dead'", [], |r| {
3054 r.get(0)
3055 })
3056 .unwrap_or(0);
3057 if dead == 0 {
3058 let _ = std::fs::remove_file(DEFAULT_QUEUE_DB);
3059 }
3060 }
3061
3062 Ok(())
3063}
3064
3065enum EnrichItemResult {
3070 Done {
3071 memory_id: Option<i64>,
3072 entity_id: Option<i64>,
3073 entities: usize,
3074 rels: usize,
3075 chars_before: Option<usize>,
3076 chars_after: Option<usize>,
3077 cost: f64,
3078 is_oauth: bool,
3079 },
3080 Skipped {
3081 reason: String,
3082 },
3083 PreservationFailed {
3088 score: f64,
3089 threshold: f64,
3090 chars_before: usize,
3091 chars_after: usize,
3092 },
3093}
3094
3095fn call_memory_bindings(
3100 conn: &Connection,
3101 namespace: &str,
3102 memory_name: &str,
3103 binary: &Path,
3104 model: Option<&str>,
3105 timeout: u64,
3106 mode: &EnrichMode,
3107) -> Result<EnrichItemResult, AppError> {
3108 let (memory_id, body): (i64, String) = conn.query_row(
3110 "SELECT id, COALESCE(body,'') FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
3111 rusqlite::params![namespace, memory_name],
3112 |r| Ok((r.get(0)?, r.get(1)?)),
3113 ).map_err(|e| match e {
3114 rusqlite::Error::QueryReturnedNoRows => AppError::NotFound(format!("memory '{memory_name}' not found")),
3115 other => AppError::Database(other),
3116 })?;
3117
3118 if body.trim().is_empty() {
3119 return Ok(EnrichItemResult::Skipped {
3120 reason: "body is empty".to_string(),
3121 });
3122 }
3123
3124 let (value, cost, is_oauth) = match mode {
3125 EnrichMode::ClaudeCode => call_claude(
3126 binary,
3127 BINDINGS_PROMPT,
3128 BINDINGS_SCHEMA,
3129 &body,
3130 model,
3131 timeout,
3132 )?,
3133 EnrichMode::Codex => call_codex(
3134 binary,
3135 BINDINGS_PROMPT,
3136 BINDINGS_SCHEMA,
3137 &body,
3138 model,
3139 timeout,
3140 )?,
3141 EnrichMode::Opencode => call_opencode(
3142 binary,
3143 BINDINGS_PROMPT,
3144 BINDINGS_SCHEMA,
3145 &body,
3146 model,
3147 timeout,
3148 )?,
3149 EnrichMode::OpenRouter => {
3150 call_openrouter(BINDINGS_PROMPT, BINDINGS_SCHEMA, &body, model, timeout)?
3151 }
3152 };
3153
3154 let empty_arr = serde_json::Value::Array(vec![]);
3155 let entities_val = value.get("entities").unwrap_or(&empty_arr);
3156 let rels_val = value.get("relationships").unwrap_or(&empty_arr);
3157
3158 let (ent_count, rel_count) =
3159 persist_memory_bindings(conn, namespace, memory_id, entities_val, rels_val)?;
3160
3161 Ok(EnrichItemResult::Done {
3162 memory_id: Some(memory_id),
3163 entity_id: None,
3164 entities: ent_count,
3165 rels: rel_count,
3166 chars_before: None,
3167 chars_after: None,
3168 cost,
3169 is_oauth,
3170 })
3171}
3172
3173fn call_entity_description(
3174 conn: &Connection,
3175 namespace: &str,
3176 entity_name: &str,
3177 binary: &Path,
3178 model: Option<&str>,
3179 timeout: u64,
3180 mode: &EnrichMode,
3181) -> Result<EnrichItemResult, AppError> {
3182 let (entity_id, entity_type): (i64, String) = conn
3183 .query_row(
3184 "SELECT id, type FROM entities WHERE namespace=?1 AND name=?2",
3185 rusqlite::params![namespace, entity_name],
3186 |r| Ok((r.get(0)?, r.get(1)?)),
3187 )
3188 .map_err(|e| match e {
3189 rusqlite::Error::QueryReturnedNoRows => {
3190 AppError::NotFound(format!("entity '{entity_name}' not found"))
3191 }
3192 other => AppError::Database(other),
3193 })?;
3194
3195 let prompt = format!(
3196 "{ENTITY_DESCRIPTION_PROMPT_PREFIX}{entity_name}\nEntity type: {entity_type}\n\nGenerate a description:"
3197 );
3198
3199 let (value, cost, is_oauth) = match mode {
3200 EnrichMode::ClaudeCode => call_claude(
3201 binary,
3202 &prompt,
3203 ENTITY_DESCRIPTION_SCHEMA,
3204 "",
3205 model,
3206 timeout,
3207 )?,
3208 EnrichMode::Codex => call_codex(
3209 binary,
3210 &prompt,
3211 ENTITY_DESCRIPTION_SCHEMA,
3212 "",
3213 model,
3214 timeout,
3215 )?,
3216 EnrichMode::Opencode => call_opencode(
3217 binary,
3218 &prompt,
3219 ENTITY_DESCRIPTION_SCHEMA,
3220 "",
3221 model,
3222 timeout,
3223 )?,
3224 EnrichMode::OpenRouter => {
3225 call_openrouter(&prompt, ENTITY_DESCRIPTION_SCHEMA, "", model, timeout)?
3226 }
3227 };
3228
3229 let description = value
3230 .get("description")
3231 .and_then(|v| v.as_str())
3232 .ok_or_else(|| AppError::Validation("LLM result missing 'description' field".into()))?;
3233
3234 persist_entity_description(conn, entity_id, description)?;
3235
3236 Ok(EnrichItemResult::Done {
3237 memory_id: None,
3238 entity_id: Some(entity_id),
3239 entities: 0,
3240 rels: 0,
3241 chars_before: None,
3242 chars_after: None,
3243 cost,
3244 is_oauth,
3245 })
3246}
3247
3248#[allow(clippy::too_many_arguments)]
3249fn call_body_enrich(
3250 conn: &Connection,
3251 namespace: &str,
3252 memory_name: &str,
3253 binary: &Path,
3254 model: Option<&str>,
3255 timeout: u64,
3256 mode: &EnrichMode,
3257 min_output_chars: usize,
3258 max_output_chars: usize,
3259 prompt_template: Option<&Path>,
3260 preserve_threshold: f64,
3261 paths: &crate::paths::AppPaths,
3262 llm_backend: crate::cli::LlmBackendChoice,
3263 embedding_backend: crate::cli::EmbeddingBackendChoice,
3264) -> Result<EnrichItemResult, AppError> {
3265 let (memory_id, body, description, memory_type): (i64, String, String, String) = conn
3266 .query_row(
3267 "SELECT id, COALESCE(body,''), COALESCE(description,''), COALESCE(type,'note') \
3268 FROM memories WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
3269 rusqlite::params![namespace, memory_name],
3270 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
3271 )
3272 .map_err(|e| match e {
3273 rusqlite::Error::QueryReturnedNoRows => {
3274 AppError::NotFound(format!("memory '{memory_name}' not found"))
3275 }
3276 other => AppError::Database(other),
3277 })?;
3278
3279 let chars_before = body.chars().count();
3280
3281 let linked_entities: Vec<String> = {
3283 let mut stmt = conn.prepare_cached(
3284 "SELECT e.name FROM memory_entities me \
3285 JOIN entities e ON e.id = me.entity_id \
3286 WHERE me.memory_id = ?1 LIMIT 10",
3287 )?;
3288 let result: Vec<String> = stmt
3289 .query_map(rusqlite::params![memory_id], |r| r.get::<_, String>(0))?
3290 .filter_map(|r| r.ok())
3291 .collect();
3292 drop(stmt);
3293 result
3294 };
3295
3296 let prompt_prefix = if let Some(tmpl_path) = prompt_template {
3298 let file_size = std::fs::metadata(tmpl_path)
3299 .map_err(|e| {
3300 AppError::Io(std::io::Error::new(
3301 e.kind(),
3302 format!("failed to stat prompt template: {e}"),
3303 ))
3304 })?
3305 .len();
3306 if file_size > MAX_MEMORY_BODY_LEN as u64 {
3307 return Err(AppError::LimitExceeded(
3308 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
3309 ));
3310 }
3311 std::fs::read_to_string(tmpl_path).map_err(|e| {
3312 AppError::Io(std::io::Error::new(
3313 e.kind(),
3314 format!("failed to read prompt template: {e}"),
3315 ))
3316 })?
3317 } else {
3318 BODY_ENRICH_PROMPT_PREFIX.to_string()
3319 };
3320
3321 let context_section = if !linked_entities.is_empty() || !description.is_empty() {
3323 let mut ctx = String::new();
3324 ctx.push_str(&format!(
3325 "\nContext:\n- Memory name: {memory_name}\n- Type: {memory_type}\n"
3326 ));
3327 if !description.is_empty() {
3328 ctx.push_str(&format!("- Description: {description}\n"));
3329 }
3330 ctx.push_str(&format!("- Domain: {namespace}\n"));
3331 if !linked_entities.is_empty() {
3332 ctx.push_str(&format!(
3333 "- Linked entities: {}\n",
3334 linked_entities.join(", ")
3335 ));
3336 }
3337 ctx
3338 } else {
3339 String::new()
3340 };
3341
3342 let prompt = format!(
3343 "{prompt_prefix}{context_section}\nTarget minimum length: {min_output_chars} characters. Maximum: {max_output_chars} characters."
3344 );
3345
3346 let (value, cost, is_oauth) = match mode {
3348 EnrichMode::ClaudeCode => {
3349 call_claude(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3350 }
3351 EnrichMode::Codex => {
3352 call_codex(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3353 }
3354 EnrichMode::Opencode => {
3355 call_opencode(binary, &prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3356 }
3357 EnrichMode::OpenRouter => {
3358 call_openrouter(&prompt, BODY_ENRICH_SCHEMA, &body, model, timeout)?
3359 }
3360 };
3361
3362 let enriched_body = value
3363 .get("enriched_body")
3364 .and_then(|v| v.as_str())
3365 .ok_or_else(|| AppError::Validation("LLM result missing 'enriched_body' field".into()))?;
3366
3367 let chars_after = enriched_body.chars().count();
3368
3369 let threshold = preserve_threshold;
3376 let verdict =
3377 crate::preservation::PreservationVerdict::evaluate(&body, enriched_body, threshold);
3378 if !verdict.is_accepted() {
3379 return Ok(EnrichItemResult::PreservationFailed {
3380 score: match verdict {
3381 crate::preservation::PreservationVerdict::Preserved { score, .. } => score,
3382 crate::preservation::PreservationVerdict::Rejected { score, .. } => score,
3383 crate::preservation::PreservationVerdict::Unchanged { .. } => 1.0,
3384 },
3385 threshold,
3386 chars_before,
3387 chars_after,
3388 });
3389 }
3390
3391 let old_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
3397 let new_hash = blake3::hash(enriched_body.as_bytes()).to_hex().to_string();
3398 if old_hash == new_hash {
3399 return Ok(EnrichItemResult::Skipped {
3400 reason: format!(
3401 "enriched body hash matches original (blake3:{old_hash}); idempotency skip"
3402 ),
3403 });
3404 }
3405
3406 if chars_after <= chars_before {
3408 return Ok(EnrichItemResult::Skipped {
3409 reason: format!(
3410 "enriched body ({chars_after} chars) not longer than original ({chars_before} chars)"
3411 ),
3412 });
3413 }
3414
3415 persist_enriched_body(
3416 conn,
3417 namespace,
3418 memory_id,
3419 memory_name,
3420 enriched_body,
3421 paths,
3422 llm_backend,
3423 embedding_backend,
3424 )?;
3425
3426 Ok(EnrichItemResult::Done {
3427 memory_id: Some(memory_id),
3428 entity_id: None,
3429 entities: 0,
3430 rels: 0,
3431 chars_before: Some(chars_before),
3432 chars_after: Some(chars_after),
3433 cost,
3434 is_oauth,
3435 })
3436}
3437
3438fn call_reembed(
3439 conn: &Connection,
3440 namespace: &str,
3441 memory_name: &str,
3442 paths: &crate::paths::AppPaths,
3443 llm_backend: crate::cli::LlmBackendChoice,
3444 embedding_backend: crate::cli::EmbeddingBackendChoice,
3445) -> Result<EnrichItemResult, AppError> {
3446 let (memory_id, body, memory_type): (i64, String, String) = conn
3447 .query_row(
3448 "SELECT id, COALESCE(body,''), COALESCE(type,'note')
3449 FROM memories
3450 WHERE namespace=?1 AND name=?2 AND deleted_at IS NULL",
3451 rusqlite::params![namespace, memory_name],
3452 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3453 )
3454 .map_err(|e| match e {
3455 rusqlite::Error::QueryReturnedNoRows => {
3456 AppError::NotFound(format!("memory '{memory_name}' not found"))
3457 }
3458 other => AppError::Database(other),
3459 })?;
3460
3461 if body.trim().is_empty() {
3462 return Ok(EnrichItemResult::Skipped {
3463 reason: "body is empty".to_string(),
3464 });
3465 }
3466
3467 reembed_memory_vector(
3468 conn,
3469 namespace,
3470 memory_id,
3471 memory_name,
3472 &memory_type,
3473 &body,
3474 paths,
3475 llm_backend,
3476 embedding_backend,
3477 )?;
3478
3479 Ok(EnrichItemResult::Done {
3480 memory_id: Some(memory_id),
3481 entity_id: None,
3482 entities: 0,
3483 rels: 0,
3484 chars_before: Some(body.chars().count()),
3485 chars_after: Some(body.chars().count()),
3486 cost: 0.0,
3487 is_oauth: true,
3488 })
3489}
3490
3491fn scan_operation(
3496 conn: &Connection,
3497 namespace: &str,
3498 args: &EnrichArgs,
3499) -> Result<Vec<String>, AppError> {
3500 let name_filter = resolve_name_filter(args)?;
3502 match args.operation {
3503 EnrichOperation::MemoryBindings => {
3504 let rows = scan_unbound_memories(conn, namespace, args.limit, &name_filter)?;
3505 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3506 }
3507 EnrichOperation::EntityDescriptions => {
3508 let rows =
3509 scan_entities_without_description(conn, namespace, args.limit, &name_filter)?;
3510 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3511 }
3512 EnrichOperation::BodyEnrich => {
3513 let rows = scan_short_body_memories(
3514 conn,
3515 namespace,
3516 args.min_output_chars,
3517 args.limit,
3518 &name_filter,
3519 )?;
3520 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3521 }
3522 EnrichOperation::ReEmbed => {
3523 let rows = scan_memories_without_embeddings(conn, namespace, args.limit, &name_filter)?;
3524 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3525 }
3526 EnrichOperation::WeightCalibrate => {
3527 let rows = scan_weight_candidates(conn, namespace, args.limit)?;
3528 Ok(rows
3529 .into_iter()
3530 .map(|(id, _, _, _, _)| id.to_string())
3531 .collect())
3532 }
3533 EnrichOperation::RelationReclassify => {
3534 let rows = scan_generic_relations(conn, namespace, args.limit)?;
3535 Ok(rows
3536 .into_iter()
3537 .map(|(id, _, _, _)| id.to_string())
3538 .collect())
3539 }
3540 EnrichOperation::EntityConnect | EnrichOperation::CrossDomainBridges => {
3541 let pairs = scan_isolated_entity_pairs(conn, namespace, args.limit)?;
3542 Ok(pairs.into_iter().map(|(_, name, _, _)| name).collect())
3543 }
3544 EnrichOperation::EntityTypeValidate => {
3545 let rows = scan_entities_for_type_validation(conn, namespace, args.limit)?;
3546 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3547 }
3548 EnrichOperation::DescriptionEnrich => {
3549 let rows = scan_generic_descriptions(conn, namespace, args.limit)?;
3550 Ok(rows.into_iter().map(|(_, name, _)| name).collect())
3551 }
3552 EnrichOperation::DomainClassify
3553 | EnrichOperation::GraphAudit
3554 | EnrichOperation::DeepResearchSynth
3555 | EnrichOperation::BodyExtract => {
3556 let limit_clause = args.limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
3557 let sql = format!(
3558 "SELECT name FROM memories WHERE namespace=?1 AND deleted_at IS NULL ORDER BY id {limit_clause}"
3559 );
3560 let mut stmt = conn.prepare(&sql)?;
3561 let names = stmt
3562 .query_map(rusqlite::params![namespace], |r| r.get::<_, String>(0))?
3563 .collect::<Result<Vec<_>, _>>()?;
3564 Ok(names)
3565 }
3566 }
3567}
3568
3569fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
3575 if let Some(p) = explicit {
3576 if p.exists() {
3577 return Ok(p.to_path_buf());
3578 }
3579 return Err(AppError::Validation(format!(
3580 "Codex binary not found at explicit path: {}",
3581 p.display()
3582 )));
3583 }
3584
3585 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
3586 let p = PathBuf::from(&env_path);
3587 if p.exists() {
3588 return Ok(p);
3589 }
3590 }
3591
3592 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
3593 if let Some(path_var) = std::env::var_os("PATH") {
3594 for dir in std::env::split_paths(&path_var) {
3595 let candidate = dir.join(name);
3596 if candidate.exists() {
3597 return Ok(crate::extract::llm_embedding::resolve_real_binary(
3598 &candidate,
3599 ));
3600 }
3601 }
3602 }
3603
3604 Err(AppError::Validation(
3605 "Codex CLI binary not found in PATH. Install it or specify --codex-binary".to_string(),
3606 ))
3607}
3608
3609fn call_weight_calibrate(
3611 conn: &Connection,
3612 _namespace: &str,
3613 item_key: &str,
3614 binary: &Path,
3615 model: Option<&str>,
3616 timeout: u64,
3617 mode: &EnrichMode,
3618) -> Result<EnrichItemResult, AppError> {
3619 let rel_id: i64 = item_key
3620 .parse()
3621 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3622 let (source_name, target_name, relation, current_weight): (String, String, String, f64) = conn
3623 .query_row(
3624 "SELECT e1.name, e2.name, r.relation, r.weight \
3625 FROM relationships r \
3626 JOIN entities e1 ON e1.id = r.source_id \
3627 JOIN entities e2 ON e2.id = r.target_id \
3628 WHERE r.id = ?1",
3629 rusqlite::params![rel_id],
3630 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
3631 )
3632 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3633
3634 let input_text = format!(
3635 "Source: {source_name}\nTarget: {target_name}\nRelation: {relation}\nCurrent weight: {current_weight}"
3636 );
3637 let (value, cost, is_oauth) = match mode {
3638 EnrichMode::ClaudeCode => call_claude(
3639 binary,
3640 WEIGHT_CALIBRATE_PROMPT,
3641 WEIGHT_CALIBRATE_SCHEMA,
3642 &input_text,
3643 model,
3644 timeout,
3645 )?,
3646 EnrichMode::Codex => call_codex(
3647 binary,
3648 WEIGHT_CALIBRATE_PROMPT,
3649 WEIGHT_CALIBRATE_SCHEMA,
3650 &input_text,
3651 model,
3652 timeout,
3653 )?,
3654 EnrichMode::Opencode => call_opencode(
3655 binary,
3656 WEIGHT_CALIBRATE_PROMPT,
3657 WEIGHT_CALIBRATE_SCHEMA,
3658 &input_text,
3659 model,
3660 timeout,
3661 )?,
3662 EnrichMode::OpenRouter => call_openrouter(
3663 WEIGHT_CALIBRATE_PROMPT,
3664 WEIGHT_CALIBRATE_SCHEMA,
3665 &input_text,
3666 model,
3667 timeout,
3668 )?,
3669 };
3670
3671 let calibrated = value
3672 .get("calibrated_weight")
3673 .and_then(|v| v.as_f64())
3674 .ok_or_else(|| AppError::Validation("LLM result missing 'calibrated_weight'".into()))?;
3675
3676 conn.execute(
3677 "UPDATE relationships SET weight = ?1 WHERE id = ?2",
3678 rusqlite::params![calibrated, rel_id],
3679 )?;
3680
3681 Ok(EnrichItemResult::Done {
3682 memory_id: None,
3683 entity_id: None,
3684 entities: 0,
3685 rels: 1,
3686 chars_before: None,
3687 chars_after: None,
3688 cost,
3689 is_oauth,
3690 })
3691}
3692
3693fn call_relation_reclassify(
3695 conn: &Connection,
3696 _namespace: &str,
3697 item_key: &str,
3698 binary: &Path,
3699 model: Option<&str>,
3700 timeout: u64,
3701 mode: &EnrichMode,
3702) -> Result<EnrichItemResult, AppError> {
3703 let rel_id: i64 = item_key
3704 .parse()
3705 .map_err(|_| AppError::Validation(format!("invalid relationship id: {item_key}")))?;
3706 let (source_name, target_name, current_relation): (String, String, String) = conn
3707 .query_row(
3708 "SELECT e1.name, e2.name, r.relation \
3709 FROM relationships r \
3710 JOIN entities e1 ON e1.id = r.source_id \
3711 JOIN entities e2 ON e2.id = r.target_id \
3712 WHERE r.id = ?1",
3713 rusqlite::params![rel_id],
3714 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3715 )
3716 .map_err(|_| AppError::NotFound(format!("relationship {rel_id} not found")))?;
3717
3718 let input_text = format!(
3719 "Source entity: {source_name}\nTarget entity: {target_name}\nCurrent relation: {current_relation}"
3720 );
3721 let (value, cost, is_oauth) = match mode {
3722 EnrichMode::ClaudeCode => call_claude(
3723 binary,
3724 RELATION_RECLASSIFY_PROMPT,
3725 RELATION_RECLASSIFY_SCHEMA,
3726 &input_text,
3727 model,
3728 timeout,
3729 )?,
3730 EnrichMode::Codex => call_codex(
3731 binary,
3732 RELATION_RECLASSIFY_PROMPT,
3733 RELATION_RECLASSIFY_SCHEMA,
3734 &input_text,
3735 model,
3736 timeout,
3737 )?,
3738 EnrichMode::Opencode => call_opencode(
3739 binary,
3740 RELATION_RECLASSIFY_PROMPT,
3741 RELATION_RECLASSIFY_SCHEMA,
3742 &input_text,
3743 model,
3744 timeout,
3745 )?,
3746 EnrichMode::OpenRouter => call_openrouter(
3747 RELATION_RECLASSIFY_PROMPT,
3748 RELATION_RECLASSIFY_SCHEMA,
3749 &input_text,
3750 model,
3751 timeout,
3752 )?,
3753 };
3754
3755 let new_relation = value
3756 .get("relation")
3757 .and_then(|v| v.as_str())
3758 .ok_or_else(|| AppError::Validation("LLM result missing 'relation'".into()))?;
3759 let new_strength = value
3760 .get("strength")
3761 .and_then(|v| v.as_f64())
3762 .unwrap_or(0.5);
3763
3764 conn.execute(
3765 "UPDATE relationships SET relation = ?1, weight = ?2 WHERE id = ?3",
3766 rusqlite::params![new_relation, new_strength, rel_id],
3767 )?;
3768
3769 Ok(EnrichItemResult::Done {
3770 memory_id: None,
3771 entity_id: None,
3772 entities: 0,
3773 rels: 1,
3774 chars_before: None,
3775 chars_after: None,
3776 cost,
3777 is_oauth,
3778 })
3779}
3780
3781fn call_entity_connect(
3783 conn: &Connection,
3784 namespace: &str,
3785 item_key: &str,
3786 binary: &Path,
3787 model: Option<&str>,
3788 timeout: u64,
3789 mode: &EnrichMode,
3790) -> Result<EnrichItemResult, AppError> {
3791 let pairs = scan_isolated_entity_pairs(conn, namespace, Some(1))?;
3792 let (e1_id, e1_name, e2_id, e2_name) =
3793 match pairs.into_iter().find(|(_, n, _, _)| n == item_key) {
3794 Some(p) => p,
3795 None => {
3796 return Ok(EnrichItemResult::Skipped {
3797 reason: "pair no longer isolated".into(),
3798 })
3799 }
3800 };
3801 let input_text = format!("Entity A: {e1_name}\nEntity B: {e2_name}");
3802 let (value, cost, is_oauth) = match mode {
3803 EnrichMode::ClaudeCode => call_claude(
3804 binary,
3805 ENTITY_CONNECT_PROMPT,
3806 ENTITY_CONNECT_SCHEMA,
3807 &input_text,
3808 model,
3809 timeout,
3810 )?,
3811 EnrichMode::Codex => call_codex(
3812 binary,
3813 ENTITY_CONNECT_PROMPT,
3814 ENTITY_CONNECT_SCHEMA,
3815 &input_text,
3816 model,
3817 timeout,
3818 )?,
3819 EnrichMode::Opencode => call_opencode(
3820 binary,
3821 ENTITY_CONNECT_PROMPT,
3822 ENTITY_CONNECT_SCHEMA,
3823 &input_text,
3824 model,
3825 timeout,
3826 )?,
3827 EnrichMode::OpenRouter => call_openrouter(
3828 ENTITY_CONNECT_PROMPT,
3829 ENTITY_CONNECT_SCHEMA,
3830 &input_text,
3831 model,
3832 timeout,
3833 )?,
3834 };
3835 let relation = value
3836 .get("relation")
3837 .and_then(|v| v.as_str())
3838 .unwrap_or("none");
3839 if relation == "none" {
3840 return Ok(EnrichItemResult::Skipped {
3841 reason: "LLM determined no relationship".into(),
3842 });
3843 }
3844 let strength = value
3845 .get("strength")
3846 .and_then(|v| v.as_f64())
3847 .unwrap_or(0.5);
3848 conn.execute(
3849 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
3850 rusqlite::params![namespace, e1_id, e2_id, relation, strength],
3851 )?;
3852 Ok(EnrichItemResult::Done {
3853 memory_id: None,
3854 entity_id: None,
3855 entities: 0,
3856 rels: 1,
3857 chars_before: None,
3858 chars_after: None,
3859 cost,
3860 is_oauth,
3861 })
3862}
3863
3864fn call_entity_type_validate(
3866 conn: &Connection,
3867 _namespace: &str,
3868 item_key: &str,
3869 binary: &Path,
3870 model: Option<&str>,
3871 timeout: u64,
3872 mode: &EnrichMode,
3873) -> Result<EnrichItemResult, AppError> {
3874 let (ent_id, ent_name, ent_type): (i64, String, String) = conn
3875 .query_row(
3876 "SELECT id, name, type FROM entities WHERE name = ?1",
3877 rusqlite::params![item_key],
3878 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
3879 )
3880 .map_err(|_| AppError::NotFound(format!("entity '{item_key}' not found")))?;
3881 let input_text = format!("Entity: {ent_name}\nCurrent type: {ent_type}");
3882 let (value, cost, is_oauth) = match mode {
3883 EnrichMode::ClaudeCode => call_claude(
3884 binary,
3885 ENTITY_TYPE_VALIDATE_PROMPT,
3886 ENTITY_TYPE_VALIDATE_SCHEMA,
3887 &input_text,
3888 model,
3889 timeout,
3890 )?,
3891 EnrichMode::Codex => call_codex(
3892 binary,
3893 ENTITY_TYPE_VALIDATE_PROMPT,
3894 ENTITY_TYPE_VALIDATE_SCHEMA,
3895 &input_text,
3896 model,
3897 timeout,
3898 )?,
3899 EnrichMode::Opencode => call_opencode(
3900 binary,
3901 ENTITY_TYPE_VALIDATE_PROMPT,
3902 ENTITY_TYPE_VALIDATE_SCHEMA,
3903 &input_text,
3904 model,
3905 timeout,
3906 )?,
3907 EnrichMode::OpenRouter => call_openrouter(
3908 ENTITY_TYPE_VALIDATE_PROMPT,
3909 ENTITY_TYPE_VALIDATE_SCHEMA,
3910 &input_text,
3911 model,
3912 timeout,
3913 )?,
3914 };
3915 let validated_type = value
3916 .get("validated_type")
3917 .and_then(|v| v.as_str())
3918 .unwrap_or(&ent_type);
3919 let was_correct = value
3920 .get("was_correct")
3921 .and_then(|v| v.as_bool())
3922 .unwrap_or(true);
3923 if !was_correct {
3924 conn.execute(
3925 "UPDATE entities SET type = ?1 WHERE id = ?2",
3926 rusqlite::params![validated_type, ent_id],
3927 )?;
3928 }
3929 Ok(EnrichItemResult::Done {
3930 memory_id: None,
3931 entity_id: Some(ent_id),
3932 entities: 1,
3933 rels: 0,
3934 chars_before: None,
3935 chars_after: None,
3936 cost,
3937 is_oauth,
3938 })
3939}
3940
3941fn call_description_enrich(
3943 conn: &Connection,
3944 _namespace: &str,
3945 item_key: &str,
3946 binary: &Path,
3947 model: Option<&str>,
3948 timeout: u64,
3949 mode: &EnrichMode,
3950) -> Result<EnrichItemResult, AppError> {
3951 let (mem_id, body, old_desc): (i64, String, String) = conn
3952 .query_row(
3953 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
3954 rusqlite::params![item_key],
3955 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
3956 )
3957 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
3958 let snippet: String = body.chars().take(500).collect();
3959 let input_text = format!(
3960 "Memory name: {item_key}\nCurrent description: {old_desc}\nBody preview: {snippet}"
3961 );
3962 let (value, cost, is_oauth) = match mode {
3963 EnrichMode::ClaudeCode => call_claude(
3964 binary,
3965 DESCRIPTION_ENRICH_PROMPT,
3966 DESCRIPTION_ENRICH_SCHEMA,
3967 &input_text,
3968 model,
3969 timeout,
3970 )?,
3971 EnrichMode::Codex => call_codex(
3972 binary,
3973 DESCRIPTION_ENRICH_PROMPT,
3974 DESCRIPTION_ENRICH_SCHEMA,
3975 &input_text,
3976 model,
3977 timeout,
3978 )?,
3979 EnrichMode::Opencode => call_opencode(
3980 binary,
3981 DESCRIPTION_ENRICH_PROMPT,
3982 DESCRIPTION_ENRICH_SCHEMA,
3983 &input_text,
3984 model,
3985 timeout,
3986 )?,
3987 EnrichMode::OpenRouter => call_openrouter(
3988 DESCRIPTION_ENRICH_PROMPT,
3989 DESCRIPTION_ENRICH_SCHEMA,
3990 &input_text,
3991 model,
3992 timeout,
3993 )?,
3994 };
3995 let new_desc = value
3996 .get("description")
3997 .and_then(|v| v.as_str())
3998 .unwrap_or(&old_desc);
3999 let old_name: String = conn.query_row(
4000 "SELECT name FROM memories WHERE id = ?1",
4001 rusqlite::params![mem_id],
4002 |r| r.get(0),
4003 )?;
4004 conn.execute(
4005 "UPDATE memories SET description = ?1 WHERE id = ?2",
4006 rusqlite::params![new_desc, mem_id],
4007 )?;
4008 memories::sync_fts_after_update(
4009 conn, mem_id, &old_name, &old_desc, &body, &old_name, new_desc, &body,
4010 )?;
4011 Ok(EnrichItemResult::Done {
4012 memory_id: Some(mem_id),
4013 entity_id: None,
4014 entities: 0,
4015 rels: 0,
4016 chars_before: Some(old_desc.len()),
4017 chars_after: Some(new_desc.len()),
4018 cost,
4019 is_oauth,
4020 })
4021}
4022
4023fn call_domain_classify(
4025 conn: &Connection,
4026 _namespace: &str,
4027 item_key: &str,
4028 binary: &Path,
4029 model: Option<&str>,
4030 timeout: u64,
4031 mode: &EnrichMode,
4032) -> Result<EnrichItemResult, AppError> {
4033 let (mem_id, body, desc): (i64, String, String) = conn
4034 .query_row(
4035 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
4036 rusqlite::params![item_key],
4037 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
4038 )
4039 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
4040 let snippet: String = body.chars().take(500).collect();
4041 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nBody preview: {snippet}");
4042 let (value, cost, is_oauth) = match mode {
4043 EnrichMode::ClaudeCode => call_claude(
4044 binary,
4045 DOMAIN_CLASSIFY_PROMPT,
4046 DOMAIN_CLASSIFY_SCHEMA,
4047 &input_text,
4048 model,
4049 timeout,
4050 )?,
4051 EnrichMode::Codex => call_codex(
4052 binary,
4053 DOMAIN_CLASSIFY_PROMPT,
4054 DOMAIN_CLASSIFY_SCHEMA,
4055 &input_text,
4056 model,
4057 timeout,
4058 )?,
4059 EnrichMode::Opencode => call_opencode(
4060 binary,
4061 DOMAIN_CLASSIFY_PROMPT,
4062 DOMAIN_CLASSIFY_SCHEMA,
4063 &input_text,
4064 model,
4065 timeout,
4066 )?,
4067 EnrichMode::OpenRouter => call_openrouter(
4068 DOMAIN_CLASSIFY_PROMPT,
4069 DOMAIN_CLASSIFY_SCHEMA,
4070 &input_text,
4071 model,
4072 timeout,
4073 )?,
4074 };
4075 let domain = value
4076 .get("domain")
4077 .and_then(|v| v.as_str())
4078 .unwrap_or("uncategorized");
4079 let metadata = format!(r#"{{"domain":"{}"}}"#, domain.replace('"', "\\\""));
4080 conn.execute(
4081 "UPDATE memories SET metadata = ?1 WHERE id = ?2",
4082 rusqlite::params![metadata, mem_id],
4083 )?;
4084 Ok(EnrichItemResult::Done {
4085 memory_id: Some(mem_id),
4086 entity_id: None,
4087 entities: 0,
4088 rels: 0,
4089 chars_before: None,
4090 chars_after: None,
4091 cost,
4092 is_oauth,
4093 })
4094}
4095
4096fn call_graph_audit(
4098 conn: &Connection,
4099 _namespace: &str,
4100 item_key: &str,
4101 binary: &Path,
4102 model: Option<&str>,
4103 timeout: u64,
4104 mode: &EnrichMode,
4105) -> Result<EnrichItemResult, AppError> {
4106 let (mem_id, body, desc): (i64, String, String) = conn
4107 .query_row(
4108 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
4109 rusqlite::params![item_key],
4110 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
4111 )
4112 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
4113 let snippet: String = body.chars().take(500).collect();
4114 let ent_count: i64 = conn
4115 .query_row(
4116 "SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?1",
4117 rusqlite::params![mem_id],
4118 |r| r.get(0),
4119 )
4120 .unwrap_or(0);
4121 let input_text = format!("Memory: {item_key}\nDescription: {desc}\nEntity bindings: {ent_count}\nBody preview: {snippet}");
4122 let (value, cost, is_oauth) = match mode {
4123 EnrichMode::ClaudeCode => call_claude(
4124 binary,
4125 GRAPH_AUDIT_PROMPT,
4126 GRAPH_AUDIT_SCHEMA,
4127 &input_text,
4128 model,
4129 timeout,
4130 )?,
4131 EnrichMode::Codex => call_codex(
4132 binary,
4133 GRAPH_AUDIT_PROMPT,
4134 GRAPH_AUDIT_SCHEMA,
4135 &input_text,
4136 model,
4137 timeout,
4138 )?,
4139 EnrichMode::Opencode => call_opencode(
4140 binary,
4141 GRAPH_AUDIT_PROMPT,
4142 GRAPH_AUDIT_SCHEMA,
4143 &input_text,
4144 model,
4145 timeout,
4146 )?,
4147 EnrichMode::OpenRouter => call_openrouter(
4148 GRAPH_AUDIT_PROMPT,
4149 GRAPH_AUDIT_SCHEMA,
4150 &input_text,
4151 model,
4152 timeout,
4153 )?,
4154 };
4155 let issues = value
4156 .get("issues")
4157 .and_then(|v| v.as_array())
4158 .map(|a| a.len())
4159 .unwrap_or(0);
4160 Ok(EnrichItemResult::Done {
4161 memory_id: Some(mem_id),
4162 entity_id: None,
4163 entities: 0,
4164 rels: issues,
4165 chars_before: None,
4166 chars_after: None,
4167 cost,
4168 is_oauth,
4169 })
4170}
4171
4172fn call_deep_research_synth(
4174 conn: &Connection,
4175 namespace: &str,
4176 item_key: &str,
4177 binary: &Path,
4178 model: Option<&str>,
4179 timeout: u64,
4180 mode: &EnrichMode,
4181) -> Result<EnrichItemResult, AppError> {
4182 let (mem_id, body): (i64, String) = conn
4183 .query_row(
4184 "SELECT id, body FROM memories WHERE name = ?1 AND deleted_at IS NULL",
4185 rusqlite::params![item_key],
4186 |r| Ok((r.get(0)?, r.get::<_, String>(1)?)),
4187 )
4188 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
4189 let snippet: String = body.chars().take(2000).collect();
4190 let input_text = format!("Memory: {item_key}\nBody:\n{snippet}");
4191 let (value, cost, is_oauth) = match mode {
4192 EnrichMode::ClaudeCode => call_claude(
4193 binary,
4194 DEEP_RESEARCH_SYNTH_PROMPT,
4195 DEEP_RESEARCH_SYNTH_SCHEMA,
4196 &input_text,
4197 model,
4198 timeout,
4199 )?,
4200 EnrichMode::Codex => call_codex(
4201 binary,
4202 DEEP_RESEARCH_SYNTH_PROMPT,
4203 DEEP_RESEARCH_SYNTH_SCHEMA,
4204 &input_text,
4205 model,
4206 timeout,
4207 )?,
4208 EnrichMode::Opencode => call_opencode(
4209 binary,
4210 DEEP_RESEARCH_SYNTH_PROMPT,
4211 DEEP_RESEARCH_SYNTH_SCHEMA,
4212 &input_text,
4213 model,
4214 timeout,
4215 )?,
4216 EnrichMode::OpenRouter => call_openrouter(
4217 DEEP_RESEARCH_SYNTH_PROMPT,
4218 DEEP_RESEARCH_SYNTH_SCHEMA,
4219 &input_text,
4220 model,
4221 timeout,
4222 )?,
4223 };
4224 let mut ent_count = 0usize;
4225 let mut rel_count = 0usize;
4226 if let Some(ents) = value.get("entities").and_then(|v| v.as_array()) {
4227 for e in ents {
4228 let name = e.get("name").and_then(|v| v.as_str()).unwrap_or_default();
4229 let etype_str = e
4230 .get("entity_type")
4231 .and_then(|v| v.as_str())
4232 .unwrap_or("concept");
4233 let etype: EntityType = etype_str.parse().unwrap_or(EntityType::Concept);
4234 if name.len() >= 2 {
4235 let ne = NewEntity {
4236 name: name.to_string(),
4237 entity_type: etype,
4238 description: None,
4239 };
4240 let _ = entities::upsert_entity(conn, namespace, &ne);
4241 ent_count += 1;
4242 }
4243 }
4244 }
4245 if let Some(rels) = value.get("relationships").and_then(|v| v.as_array()) {
4246 for r in rels {
4247 let src = r.get("source").and_then(|v| v.as_str()).unwrap_or_default();
4248 let tgt = r.get("target").and_then(|v| v.as_str()).unwrap_or_default();
4249 if src.is_empty() || tgt.is_empty() {
4250 continue;
4251 }
4252 let rel = r
4253 .get("relation")
4254 .and_then(|v| v.as_str())
4255 .unwrap_or("related");
4256 let str_ = r.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.5);
4257 if let (Some(sid), Some(tid)) = (
4258 entities::find_entity_id(conn, namespace, src)?,
4259 entities::find_entity_id(conn, namespace, tgt)?,
4260 ) {
4261 let _ = entities::create_or_fetch_relationship(
4262 conn, namespace, sid, tid, rel, str_, None,
4263 );
4264 rel_count += 1;
4265 }
4266 }
4267 }
4268 Ok(EnrichItemResult::Done {
4269 memory_id: Some(mem_id),
4270 entity_id: None,
4271 entities: ent_count,
4272 rels: rel_count,
4273 chars_before: None,
4274 chars_after: None,
4275 cost,
4276 is_oauth,
4277 })
4278}
4279
4280fn call_body_extract(
4282 conn: &Connection,
4283 _namespace: &str,
4284 item_key: &str,
4285 binary: &Path,
4286 model: Option<&str>,
4287 timeout: u64,
4288 mode: &EnrichMode,
4289) -> Result<EnrichItemResult, AppError> {
4290 let (mem_id, body, old_desc): (i64, String, String) = conn
4291 .query_row(
4292 "SELECT id, body, description FROM memories WHERE name = ?1 AND deleted_at IS NULL",
4293 rusqlite::params![item_key],
4294 |r| Ok((r.get(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?)),
4295 )
4296 .map_err(|_| AppError::NotFound(format!("memory '{item_key}' not found")))?;
4297 let old_name: String = conn.query_row(
4298 "SELECT name FROM memories WHERE id = ?1",
4299 rusqlite::params![mem_id],
4300 |r| r.get(0),
4301 )?;
4302 let input_text = format!("Memory: {item_key}\nBody:\n{body}");
4303 let (value, cost, is_oauth) = match mode {
4304 EnrichMode::ClaudeCode => call_claude(
4305 binary,
4306 BODY_EXTRACT_PROMPT,
4307 BODY_EXTRACT_SCHEMA,
4308 &input_text,
4309 model,
4310 timeout,
4311 )?,
4312 EnrichMode::Codex => call_codex(
4313 binary,
4314 BODY_EXTRACT_PROMPT,
4315 BODY_EXTRACT_SCHEMA,
4316 &input_text,
4317 model,
4318 timeout,
4319 )?,
4320 EnrichMode::Opencode => call_opencode(
4321 binary,
4322 BODY_EXTRACT_PROMPT,
4323 BODY_EXTRACT_SCHEMA,
4324 &input_text,
4325 model,
4326 timeout,
4327 )?,
4328 EnrichMode::OpenRouter => call_openrouter(
4329 BODY_EXTRACT_PROMPT,
4330 BODY_EXTRACT_SCHEMA,
4331 &input_text,
4332 model,
4333 timeout,
4334 )?,
4335 };
4336 let restructured = value
4337 .get("restructured_body")
4338 .and_then(|v| v.as_str())
4339 .unwrap_or(&body);
4340 let chars_before = body.len();
4341 let chars_after = restructured.len();
4342 let new_hash = blake3::hash(restructured.as_bytes()).to_hex().to_string();
4343 conn.execute(
4344 "UPDATE memories SET body = ?1, body_hash = ?2, updated_at = unixepoch() WHERE id = ?3",
4345 rusqlite::params![restructured, new_hash, mem_id],
4346 )?;
4347 memories::sync_fts_after_update(
4348 conn,
4349 mem_id,
4350 &old_name,
4351 &old_desc,
4352 &body,
4353 &old_name,
4354 &old_desc,
4355 restructured,
4356 )?;
4357 Ok(EnrichItemResult::Done {
4358 memory_id: Some(mem_id),
4359 entity_id: None,
4360 entities: 0,
4361 rels: 0,
4362 chars_before: Some(chars_before),
4363 chars_after: Some(chars_after),
4364 cost,
4365 is_oauth,
4366 })
4367}
4368
4369#[allow(clippy::type_complexity)]
4371fn scan_isolated_entity_pairs(
4372 conn: &Connection,
4373 namespace: &str,
4374 limit: Option<usize>,
4375) -> Result<Vec<(i64, String, i64, String)>, AppError> {
4376 let limit_val = limit.unwrap_or(50) as i64;
4377 let mut stmt = conn.prepare_cached(
4378 "SELECT e1.id, e1.name, e2.id, e2.name FROM entities e1, entities e2 \
4379 WHERE e1.namespace = ?1 AND e2.namespace = ?1 AND e1.id < e2.id \
4380 AND NOT EXISTS (SELECT 1 FROM relationships r WHERE \
4381 (r.source_id = e1.id AND r.target_id = e2.id) OR \
4382 (r.source_id = e2.id AND r.target_id = e1.id)) \
4383 LIMIT ?2",
4384 )?;
4385 let rows = stmt
4386 .query_map(rusqlite::params![namespace, limit_val], |r| {
4387 Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?))
4388 })?
4389 .collect::<Result<Vec<_>, _>>()?;
4390 Ok(rows)
4391}
4392
4393fn scan_entities_for_type_validation(
4395 conn: &Connection,
4396 namespace: &str,
4397 limit: Option<usize>,
4398) -> Result<Vec<(i64, String, String)>, AppError> {
4399 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
4400 let sql = format!(
4401 "SELECT id, name, type FROM entities WHERE namespace = ?1 ORDER BY id {limit_clause}"
4402 );
4403 let mut stmt = conn.prepare(&sql)?;
4404 let rows = stmt
4405 .query_map(rusqlite::params![namespace], |r| {
4406 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
4407 })?
4408 .collect::<Result<Vec<_>, _>>()?;
4409 Ok(rows)
4410}
4411
4412fn scan_generic_descriptions(
4414 conn: &Connection,
4415 namespace: &str,
4416 limit: Option<usize>,
4417) -> Result<Vec<(i64, String, String)>, AppError> {
4418 let limit_clause = limit.map(|n| format!("LIMIT {n}")).unwrap_or_default();
4419 let sql = format!(
4420 "SELECT id, name, description FROM memories WHERE namespace = ?1 AND deleted_at IS NULL \
4421 AND (description LIKE '%ingested%' OR description LIKE '%imported%' OR description LIKE '%added%' OR length(description) < 30) \
4422 ORDER BY id {limit_clause}"
4423 );
4424 let mut stmt = conn.prepare(&sql)?;
4425 let rows = stmt
4426 .query_map(rusqlite::params![namespace], |r| {
4427 Ok((r.get(0)?, r.get(1)?, r.get(2)?))
4428 })?
4429 .collect::<Result<Vec<_>, _>>()?;
4430 Ok(rows)
4431}
4432
4433fn call_codex(
4437 binary: &Path,
4438 prompt: &str,
4439 json_schema: &str,
4440 input_text: &str,
4441 model: Option<&str>,
4442 timeout_secs: u64,
4443) -> Result<(serde_json::Value, f64, bool), AppError> {
4444 use wait_timeout::ChildExt;
4445
4446 super::codex_spawn::validate_codex_model(model)?;
4451 let schema_file = super::codex_spawn::trusted_schema_path()?;
4452
4453 let args = super::codex_spawn::CodexSpawnArgs {
4454 binary,
4455 prompt,
4456 json_schema,
4457 input_text,
4458 model,
4459 timeout_secs,
4460 schema_path: schema_file.clone(),
4461 };
4462 let mut cmd = super::codex_spawn::build_codex_command(&args)?;
4463
4464 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
4465 AppError::Io(std::io::Error::new(
4466 e.kind(),
4467 format!("failed to spawn codex: {e}"),
4468 ))
4469 })?;
4470
4471 let full_prompt = format!("{prompt}\n\n{input_text}");
4472 let stdin_bytes = full_prompt.into_bytes();
4473 let mut child_stdin = child
4474 .stdin
4475 .take()
4476 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
4477 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
4478 child_stdin.write_all(&stdin_bytes)?;
4479 drop(child_stdin);
4480 Ok(())
4481 });
4482
4483 let start = std::time::Instant::now();
4484 let timeout = std::time::Duration::from_secs(timeout_secs);
4485 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4486 let _ = std::fs::remove_file(&schema_file);
4487
4488 match status {
4489 Some(exit_status) => {
4490 stdin_thread
4491 .join()
4492 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
4493 .map_err(AppError::Io)?;
4494
4495 tracing::debug!(
4496 target: "process",
4497 exit_code = ?exit_status.code(),
4498 elapsed_ms = start.elapsed().as_millis() as u64,
4499 "external process completed"
4500 );
4501
4502 let mut stdout_buf = Vec::new();
4503 if let Some(mut out) = child.stdout.take() {
4504 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4505 }
4506 if !exit_status.success() {
4507 let mut stderr_buf = Vec::new();
4508 if let Some(mut err) = child.stderr.take() {
4509 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4510 }
4511 let stderr_str = String::from_utf8_lossy(&stderr_buf);
4512 tracing::warn!(
4513 target: "enrich",
4514 exit_code = ?exit_status.code(),
4515 stderr = %stderr_str.trim(),
4516 "codex process failed"
4517 );
4518 return Err(AppError::Validation(format!(
4519 "codex exited with code {:?}: {}",
4520 exit_status.code(),
4521 stderr_str.trim()
4522 )));
4523 }
4524 let stdout_str = String::from_utf8(stdout_buf)
4525 .map_err(|_| AppError::Validation("codex stdout is not valid UTF-8".into()))?;
4526 let result = super::codex_spawn::parse_codex_jsonl(&stdout_str)?;
4529 let value: serde_json::Value =
4535 serde_json::from_str(&result.last_agent_text).map_err(|e| {
4536 AppError::Validation(format!(
4537 "codex agent_message is not valid JSON: {e}; raw={}",
4538 result.last_agent_text
4539 ))
4540 })?;
4541 Ok((value, 0.0, false))
4542 }
4543 None => {
4544 let _ = child.kill();
4545 let _ = child.wait();
4546 let _ = stdin_thread.join();
4547 Err(AppError::Validation(format!(
4548 "codex timed out after {timeout_secs} seconds"
4549 )))
4550 }
4551 }
4552}
4553
4554fn call_opencode(
4555 binary: &Path,
4556 prompt: &str,
4557 json_schema: &str,
4558 input_text: &str,
4559 model: Option<&str>,
4560 timeout_secs: u64,
4561) -> Result<(serde_json::Value, f64, bool), AppError> {
4562 use wait_timeout::ChildExt;
4563
4564 let resolved_model = super::opencode_runner::resolve_opencode_model(model);
4565
4566 let augmented_prompt = if json_schema.is_empty() {
4567 prompt.to_string()
4568 } else {
4569 format!(
4570 "{prompt}\n\nIMPORTANT: You MUST respond with ONLY valid JSON (no markdown, no explanation, no code fences). \
4571 The JSON MUST match this schema:\n{json_schema}"
4572 )
4573 };
4574
4575 let mut cmd = super::opencode_runner::build_opencode_command_sync(
4576 binary,
4577 &resolved_model,
4578 &augmented_prompt,
4579 input_text,
4580 )?;
4581
4582 let mut child = super::opencode_runner::spawn_opencode(&mut cmd).map_err(|e| {
4583 AppError::Io(std::io::Error::new(
4584 e.kind(),
4585 format!("failed to spawn opencode: {e}"),
4586 ))
4587 })?;
4588
4589 let start = std::time::Instant::now();
4590 let timeout = std::time::Duration::from_secs(timeout_secs);
4591 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
4592
4593 match status {
4594 Some(exit_status) => {
4595 tracing::debug!(
4596 target: "process",
4597 exit_code = ?exit_status.code(),
4598 elapsed_ms = start.elapsed().as_millis() as u64,
4599 "opencode process completed"
4600 );
4601
4602 let mut stdout_buf = Vec::new();
4603 if let Some(mut out) = child.stdout.take() {
4604 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
4605 }
4606 if !exit_status.success() {
4607 let mut stderr_buf = Vec::new();
4608 if let Some(mut err) = child.stderr.take() {
4609 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
4610 }
4611 let stderr_str = String::from_utf8_lossy(&stderr_buf);
4612 tracing::warn!(
4613 target: "enrich",
4614 exit_code = ?exit_status.code(),
4615 stderr = %stderr_str.trim(),
4616 "opencode process failed"
4617 );
4618 return Err(AppError::Validation(format!(
4619 "opencode exited with code {:?}: {}",
4620 exit_status.code(),
4621 stderr_str.trim()
4622 )));
4623 }
4624 let stdout_str = String::from_utf8(stdout_buf)
4625 .map_err(|_| AppError::Validation("opencode stdout is not valid UTF-8".into()))?;
4626 let (text, cost, _tokens) = super::opencode_runner::parse_opencode_output(&stdout_str)?;
4627 let value: serde_json::Value =
4628 super::opencode_runner::parse_json_from_opencode_text(&text).map_err(|e| {
4629 AppError::Validation(format!("opencode response is not valid JSON: {e}"))
4630 })?;
4631 Ok((value, cost, false))
4632 }
4633 None => {
4634 let _ = child.kill();
4635 let _ = child.wait();
4636 Err(AppError::Validation(format!(
4637 "opencode timed out after {timeout_secs} seconds"
4638 )))
4639 }
4640 }
4641}
4642
4643#[cfg(test)]
4648mod tests {
4649 use super::*;
4650 use rusqlite::Connection;
4651 #[cfg(unix)]
4652 use std::os::unix::fs::PermissionsExt;
4653
4654 fn open_test_db() -> Connection {
4656 let conn = Connection::open_in_memory().expect("in-memory db");
4657 conn.execute_batch(
4658 "CREATE TABLE memories (
4659 id INTEGER PRIMARY KEY AUTOINCREMENT,
4660 namespace TEXT NOT NULL DEFAULT 'global',
4661 name TEXT NOT NULL,
4662 type TEXT NOT NULL DEFAULT 'note',
4663 description TEXT NOT NULL DEFAULT '',
4664 body TEXT NOT NULL DEFAULT '',
4665 body_hash TEXT NOT NULL DEFAULT '',
4666 session_id TEXT,
4667 source TEXT NOT NULL DEFAULT 'agent',
4668 metadata TEXT NOT NULL DEFAULT '{}',
4669 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
4670 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
4671 deleted_at INTEGER,
4672 UNIQUE(namespace, name)
4673 );
4674 CREATE TABLE entities (
4675 id INTEGER PRIMARY KEY AUTOINCREMENT,
4676 namespace TEXT NOT NULL DEFAULT 'global',
4677 name TEXT NOT NULL,
4678 type TEXT NOT NULL DEFAULT 'concept',
4679 description TEXT,
4680 degree INTEGER NOT NULL DEFAULT 0,
4681 created_at INTEGER NOT NULL DEFAULT (unixepoch()),
4682 updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
4683 UNIQUE(namespace, name)
4684 );
4685 CREATE TABLE memory_entities (
4686 memory_id INTEGER NOT NULL,
4687 entity_id INTEGER NOT NULL,
4688 PRIMARY KEY (memory_id, entity_id)
4689 );
4690 CREATE TABLE relationships (
4691 id INTEGER PRIMARY KEY AUTOINCREMENT,
4692 namespace TEXT NOT NULL DEFAULT 'global',
4693 source_id INTEGER NOT NULL,
4694 target_id INTEGER NOT NULL,
4695 relation TEXT NOT NULL,
4696 weight REAL NOT NULL DEFAULT 0.5,
4697 description TEXT,
4698 UNIQUE(source_id, target_id, relation)
4699 );
4700 CREATE TABLE memory_embeddings (
4701 memory_id INTEGER PRIMARY KEY,
4702 namespace TEXT NOT NULL,
4703 embedding BLOB NOT NULL,
4704 source TEXT NOT NULL,
4705 model TEXT NOT NULL DEFAULT '',
4706 dim INTEGER NOT NULL DEFAULT 384,
4707 created_at INTEGER NOT NULL DEFAULT (unixepoch())
4708 );",
4709 )
4710 .expect("schema creation must succeed");
4711 conn
4712 }
4713
4714 #[test]
4715 fn scan_unbound_memories_finds_memories_without_bindings() {
4716 let conn = open_test_db();
4717 conn.execute(
4718 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'test-mem', 'some body content')",
4719 [],
4720 )
4721 .unwrap();
4722
4723 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4724 assert_eq!(results.len(), 1);
4725 assert_eq!(results[0].1, "test-mem");
4726 }
4727
4728 #[test]
4729 fn scan_unbound_memories_excludes_bound_memories() {
4730 let conn = open_test_db();
4731 conn.execute(
4732 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'bound-mem', 'body')",
4733 [],
4734 )
4735 .unwrap();
4736 let mem_id: i64 = conn
4737 .query_row("SELECT id FROM memories WHERE name='bound-mem'", [], |r| {
4738 r.get(0)
4739 })
4740 .unwrap();
4741 conn.execute(
4742 "INSERT INTO entities (namespace, name) VALUES ('global', 'some-entity')",
4743 [],
4744 )
4745 .unwrap();
4746 let ent_id: i64 = conn
4747 .query_row(
4748 "SELECT id FROM entities WHERE name='some-entity'",
4749 [],
4750 |r| r.get(0),
4751 )
4752 .unwrap();
4753 conn.execute(
4754 "INSERT INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
4755 rusqlite::params![mem_id, ent_id],
4756 )
4757 .unwrap();
4758
4759 let results = scan_unbound_memories(&conn, "global", None, &[]).unwrap();
4760 assert!(results.is_empty(), "bound memory must not appear in scan");
4761 }
4762
4763 #[test]
4764 fn scan_entities_without_description_finds_null_description() {
4765 let conn = open_test_db();
4766 conn.execute(
4767 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'my-tool', 'tool', NULL)",
4768 [],
4769 )
4770 .unwrap();
4771
4772 let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4773 assert_eq!(results.len(), 1);
4774 assert_eq!(results[0].1, "my-tool");
4775 }
4776
4777 #[test]
4778 fn scan_entities_without_description_excludes_entities_with_description() {
4779 let conn = open_test_db();
4780 conn.execute(
4781 "INSERT INTO entities (namespace, name, type, description) VALUES ('global', 'described-tool', 'tool', 'Has a description already')",
4782 [],
4783 )
4784 .unwrap();
4785
4786 let results = scan_entities_without_description(&conn, "global", None, &[]).unwrap();
4787 assert!(
4788 results.is_empty(),
4789 "entity with description must not appear"
4790 );
4791 }
4792
4793 #[test]
4794 fn scan_short_body_memories_finds_short_bodies() {
4795 let conn = open_test_db();
4796 conn.execute(
4797 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'short-mem', 'hi')",
4798 [],
4799 )
4800 .unwrap();
4801
4802 let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4803 assert_eq!(results.len(), 1);
4804 assert_eq!(results[0].1, "short-mem");
4805 }
4806
4807 #[test]
4808 fn scan_short_body_memories_excludes_long_bodies() {
4809 let conn = open_test_db();
4810 let long_body = "a".repeat(1000);
4811 conn.execute(
4812 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'long-mem', ?1)",
4813 rusqlite::params![long_body],
4814 )
4815 .unwrap();
4816
4817 let results = scan_short_body_memories(&conn, "global", 100, None, &[]).unwrap();
4818 assert!(results.is_empty(), "long memory must not appear in scan");
4819 }
4820
4821 #[test]
4822 fn scan_respects_limit() {
4823 let conn = open_test_db();
4824 for i in 0..5 {
4825 conn.execute(
4826 &format!("INSERT INTO memories (namespace, name, body) VALUES ('global', 'mem-{i}', 'short')"),
4827 [],
4828 )
4829 .unwrap();
4830 }
4831
4832 let results = scan_short_body_memories(&conn, "global", 1000, Some(3), &[]).unwrap();
4833 assert_eq!(results.len(), 3, "limit must be respected");
4834 }
4835
4836 #[test]
4837 fn scan_memories_without_embeddings_finds_only_missing_rows() {
4838 let conn = open_test_db();
4839 conn.execute(
4840 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'missing-vec', 'body one')",
4841 [],
4842 )
4843 .unwrap();
4844 conn.execute(
4845 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'has-vec', 'body two')",
4846 [],
4847 )
4848 .unwrap();
4849 let memory_id: i64 = conn
4850 .query_row(
4851 "SELECT id FROM memories WHERE namespace='global' AND name='has-vec'",
4852 [],
4853 |r| r.get(0),
4854 )
4855 .unwrap();
4856 let embedding = vec![0.0_f32; crate::constants::embedding_dim()];
4857 memories::upsert_vec(
4858 &conn, memory_id, "global", "note", &embedding, "has-vec", "body two",
4859 )
4860 .unwrap();
4861
4862 let results = scan_memories_without_embeddings(&conn, "global", None, &[]).unwrap();
4863 assert_eq!(results.len(), 1);
4864 assert_eq!(results[0].1, "missing-vec");
4865 }
4866
4867 #[test]
4868 fn scan_memories_without_embeddings_respects_name_filter() {
4869 let conn = open_test_db();
4870 conn.execute(
4871 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'match-me', 'body one')",
4872 [],
4873 )
4874 .unwrap();
4875 conn.execute(
4876 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'skip-me', 'body two')",
4877 [],
4878 )
4879 .unwrap();
4880
4881 let results =
4882 scan_memories_without_embeddings(&conn, "global", None, &["match-me".to_string()])
4883 .unwrap();
4884 assert_eq!(results.len(), 1);
4885 assert_eq!(results[0].1, "match-me");
4886 }
4887
4888 #[test]
4889 fn queue_db_schema_creates_correctly() {
4890 let tmp_path = format!("/tmp/test-enrich-queue-{}.sqlite", std::process::id());
4891 let conn = open_queue_db(&tmp_path).expect("queue db must open");
4892 let count: i64 = conn
4893 .query_row("SELECT COUNT(*) FROM queue", [], |r| r.get(0))
4894 .unwrap();
4895 assert_eq!(count, 0);
4896 let _ = std::fs::remove_file(&tmp_path);
4897 }
4898
4899 #[test]
4900 fn parse_claude_output_valid_bindings() {
4901 let output = r#"[
4902 {"type":"system","subtype":"init"},
4903 {"type":"result","is_error":false,"total_cost_usd":0.01,
4904 "structured_output":{"entities":[{"name":"rust-lang","entity_type":"tool"}],"relationships":[]}}
4905 ]"#;
4906 let result = crate::commands::claude_runner::parse_claude_output(output)
4907 .expect("must parse successfully");
4908 assert!(result.value.get("entities").is_some());
4909 assert!((result.cost_usd - 0.01).abs() < f64::EPSILON);
4910 assert!(!result.is_oauth);
4911 }
4912
4913 #[test]
4914 fn parse_claude_output_detects_oauth() {
4915 let output = r#"[
4916 {"type":"system","subtype":"init","apiKeySource":"none"},
4917 {"type":"result","is_error":false,"total_cost_usd":0.0,
4918 "structured_output":{"entities":[],"relationships":[]}}
4919 ]"#;
4920 let result = crate::commands::claude_runner::parse_claude_output(output).unwrap();
4921 assert!(result.is_oauth);
4922 }
4923
4924 #[test]
4925 fn parse_claude_output_rate_limit_returns_error() {
4926 let output = r#"[
4927 {"type":"system","subtype":"init"},
4928 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
4929 ]"#;
4930 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4931 assert!(matches!(err, AppError::RateLimited { .. }));
4932 }
4933
4934 #[test]
4935 fn parse_claude_output_auth_error() {
4936 let output = r#"[
4937 {"type":"system","subtype":"init"},
4938 {"type":"result","is_error":true,"error":"authentication failed"}
4939 ]"#;
4940 let err = crate::commands::claude_runner::parse_claude_output(output).unwrap_err();
4941 assert!(format!("{err}").contains("authentication failed"));
4942 }
4943
4944 #[cfg(unix)]
4945 #[test]
4946 fn call_codex_returns_raw_json_for_body_enrich_schema() {
4947 let tmp = tempfile::tempdir().expect("tempdir");
4948 let binary = tmp.path().join("codex-mock");
4949 std::fs::write(
4950 &binary,
4951 r#"#!/usr/bin/env bash
4952set -euo pipefail
4953cat <<'JSONL'
4954{"type":"thread.started","thread_id":"mock-thread-0"}
4955{"type":"item.completed","item":{"type":"agent_message","text":"{\"enriched_body\":\"expanded body\"}"}}
4956{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}
4957JSONL
4958"#,
4959 )
4960 .expect("mock codex write");
4961 let mut perms = std::fs::metadata(&binary).expect("metadata").permissions();
4962 perms.set_mode(0o755);
4963 std::fs::set_permissions(&binary, perms).expect("chmod");
4964
4965 let (value, cost, is_oauth) =
4966 call_codex(&binary, "prompt", BODY_ENRICH_SCHEMA, "body", None, 5)
4967 .expect("call_codex must accept body-enrich payload");
4968
4969 assert_eq!(value["enriched_body"], "expanded body");
4970 assert_eq!(cost, 0.0);
4971 assert!(!is_oauth);
4972 }
4973
4974 #[test]
4975 fn dry_run_emits_preview_without_calling_llm() {
4976 let conn = open_test_db();
4981 conn.execute(
4982 "INSERT INTO memories (namespace, name, body) VALUES ('global', 'dry-mem', 'tiny')",
4983 [],
4984 )
4985 .unwrap();
4986
4987 let results = scan_short_body_memories(&conn, "global", 1000, None, &[]).unwrap();
4988 assert_eq!(results.len(), 1);
4989 assert_eq!(results[0].1, "dry-mem");
4990 }
4993
4994 #[test]
4995 fn persist_entity_description_updates_db() {
4996 let conn = open_test_db();
4997 conn.execute(
4998 "INSERT INTO entities (namespace, name, type) VALUES ('global', 'tokio-runtime', 'tool')",
4999 [],
5000 )
5001 .unwrap();
5002 let eid: i64 = conn
5003 .query_row(
5004 "SELECT id FROM entities WHERE name='tokio-runtime'",
5005 [],
5006 |r| r.get(0),
5007 )
5008 .unwrap();
5009
5010 persist_entity_description(&conn, eid, "Async runtime for Rust applications").unwrap();
5011
5012 let desc: String = conn
5013 .query_row(
5014 "SELECT description FROM entities WHERE id=?1",
5015 rusqlite::params![eid],
5016 |r| r.get(0),
5017 )
5018 .unwrap();
5019 assert_eq!(desc, "Async runtime for Rust applications");
5020 }
5021
5022 #[test]
5023 fn bindings_schema_is_valid_json() {
5024 let _: serde_json::Value =
5025 serde_json::from_str(BINDINGS_SCHEMA).expect("BINDINGS_SCHEMA must be valid JSON");
5026 }
5027
5028 #[test]
5029 fn entity_description_schema_is_valid_json() {
5030 let _: serde_json::Value = serde_json::from_str(ENTITY_DESCRIPTION_SCHEMA)
5031 .expect("ENTITY_DESCRIPTION_SCHEMA must be valid JSON");
5032 }
5033
5034 #[test]
5035 fn body_enrich_schema_is_valid_json() {
5036 let _: serde_json::Value = serde_json::from_str(BODY_ENRICH_SCHEMA)
5037 .expect("BODY_ENRICH_SCHEMA must be valid JSON");
5038 }
5039
5040 fn open_temp_queue() -> (Connection, String) {
5043 let path = format!(
5044 "/tmp/test-enrich-dl-{}-{}.sqlite",
5045 std::process::id(),
5046 fastrand::u64(..)
5047 );
5048 let conn = open_queue_db(&path).expect("queue db must open");
5049 (conn, path)
5050 }
5051
5052 fn insert_pending(conn: &Connection, key: &str) -> i64 {
5053 conn.execute(
5054 "INSERT INTO queue (item_key, item_type, status) VALUES (?1, 'memory', 'pending')",
5055 rusqlite::params![key],
5056 )
5057 .unwrap();
5058 conn.last_insert_rowid()
5059 }
5060
5061 #[test]
5062 fn classify_rate_limit_is_transient() {
5063 let e = AppError::RateLimited {
5064 detail: "429".into(),
5065 };
5066 assert_eq!(
5067 classify_enrich_outcome(&e),
5068 crate::retry::AttemptOutcome::Transient
5069 );
5070 }
5071
5072 #[test]
5073 fn classify_timeout_and_dbbusy_are_transient() {
5074 let t = AppError::Timeout {
5075 operation: "judge".into(),
5076 duration_secs: 30,
5077 };
5078 let b = AppError::DbBusy("locked".into());
5079 assert_eq!(
5080 classify_enrich_outcome(&t),
5081 crate::retry::AttemptOutcome::Transient
5082 );
5083 assert_eq!(
5084 classify_enrich_outcome(&b),
5085 crate::retry::AttemptOutcome::Transient
5086 );
5087 }
5088
5089 #[test]
5090 fn classify_validation_and_parse_are_hard_failure() {
5091 let v = AppError::Validation("failed to parse entities array: bad".into());
5092 assert_eq!(
5093 classify_enrich_outcome(&v),
5094 crate::retry::AttemptOutcome::HardFailure
5095 );
5096 }
5097
5098 #[test]
5099 fn open_queue_db_alter_is_idempotent() {
5100 let path = format!(
5101 "/tmp/test-enrich-idem-{}-{}.sqlite",
5102 std::process::id(),
5103 fastrand::u64(..)
5104 );
5105 let _ = open_queue_db(&path).expect("first open");
5107 let conn = open_queue_db(&path).expect("second open is idempotent");
5109 let cols: Vec<String> = {
5110 let mut stmt = conn.prepare("PRAGMA table_info(queue)").unwrap();
5111 stmt.query_map([], |r| r.get::<_, String>(1))
5112 .unwrap()
5113 .collect::<Result<Vec<_>, _>>()
5114 .unwrap()
5115 };
5116 assert!(cols.iter().any(|c| c == "error_class"));
5117 assert!(cols.iter().any(|c| c == "next_retry_at"));
5118 let _ = std::fs::remove_file(&path);
5119 }
5120
5121 #[test]
5122 fn record_item_failure_hard_marks_dead() {
5123 let (conn, path) = open_temp_queue();
5124 let id = insert_pending(&conn, "mem-hard");
5125 let outcome = record_item_failure(
5126 &conn,
5127 id,
5128 1,
5129 5,
5130 &AppError::Validation("invalid body".into()),
5131 );
5132 assert_eq!(outcome, crate::retry::AttemptOutcome::HardFailure);
5133 let status: String = conn
5134 .query_row(
5135 "SELECT status FROM queue WHERE id=?1",
5136 rusqlite::params![id],
5137 |r| r.get(0),
5138 )
5139 .unwrap();
5140 assert_eq!(status, "dead");
5141 let _ = std::fs::remove_file(&path);
5142 }
5143
5144 #[test]
5145 fn record_item_failure_transient_reschedules_pending() {
5146 let (conn, path) = open_temp_queue();
5147 let id = insert_pending(&conn, "mem-transient");
5148 let outcome = record_item_failure(
5149 &conn,
5150 id,
5151 1,
5152 5,
5153 &AppError::RateLimited {
5154 detail: "429".into(),
5155 },
5156 );
5157 assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
5158 let (status, future): (String, i64) = conn
5159 .query_row(
5160 "SELECT status, (next_retry_at > datetime('now')) FROM queue WHERE id=?1",
5161 rusqlite::params![id],
5162 |r| Ok((r.get(0)?, r.get(1)?)),
5163 )
5164 .unwrap();
5165 assert_eq!(status, "pending");
5166 assert_eq!(future, 1, "next_retry_at must be in the future");
5167 let _ = std::fs::remove_file(&path);
5168 }
5169
5170 #[test]
5171 fn record_item_failure_transient_at_cap_marks_dead() {
5172 let (conn, path) = open_temp_queue();
5173 let id = insert_pending(&conn, "mem-cap");
5174 let outcome = record_item_failure(
5176 &conn,
5177 id,
5178 5,
5179 5,
5180 &AppError::RateLimited {
5181 detail: "429".into(),
5182 },
5183 );
5184 assert_eq!(outcome, crate::retry::AttemptOutcome::Transient);
5185 let status: String = conn
5186 .query_row(
5187 "SELECT status FROM queue WHERE id=?1",
5188 rusqlite::params![id],
5189 |r| r.get(0),
5190 )
5191 .unwrap();
5192 assert_eq!(status, "dead");
5193 let _ = std::fs::remove_file(&path);
5194 }
5195
5196 #[test]
5197 fn dequeue_skips_future_retry_and_dead() {
5198 let (conn, path) = open_temp_queue();
5199 let eligible = insert_pending(&conn, "mem-eligible");
5201 let waiting = insert_pending(&conn, "mem-waiting");
5203 conn.execute(
5204 "UPDATE queue SET next_retry_at=datetime('now', '+3600 seconds') WHERE id=?1",
5205 rusqlite::params![waiting],
5206 )
5207 .unwrap();
5208 let dead = insert_pending(&conn, "mem-dead");
5210 conn.execute(
5211 "UPDATE queue SET status='dead' WHERE id=?1",
5212 rusqlite::params![dead],
5213 )
5214 .unwrap();
5215
5216 let claimed: Option<i64> = conn
5217 .query_row(
5218 "UPDATE queue SET status='processing', attempt=attempt+1 \
5219 WHERE id = (SELECT id FROM queue WHERE status='pending' \
5220 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
5221 ORDER BY id LIMIT 1) \
5222 RETURNING id",
5223 [],
5224 |r| r.get(0),
5225 )
5226 .ok();
5227 assert_eq!(claimed, Some(eligible));
5228
5229 let second: Option<i64> = conn
5231 .query_row(
5232 "UPDATE queue SET status='processing', attempt=attempt+1 \
5233 WHERE id = (SELECT id FROM queue WHERE status='pending' \
5234 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now')) \
5235 ORDER BY id LIMIT 1) \
5236 RETURNING id",
5237 [],
5238 |r| r.get(0),
5239 )
5240 .ok();
5241 assert_eq!(second, None);
5242 let _ = std::fs::remove_file(&path);
5243 }
5244}