1use crate::commands::ingest::IngestArgs;
12use crate::entity_type::EntityType;
13use crate::errors::AppError;
14use crate::paths::AppPaths;
15use crate::storage::connection::{ensure_db_ready, open_rw};
16use crate::storage::entities::{self, NewEntity, NewRelationship};
17use crate::storage::memories::{self, NewMemory};
18
19use rusqlite::Connection;
20use serde::{Deserialize, Serialize};
21use std::io::Write;
22use std::path::{Path, PathBuf};
23use std::process::{Command, Stdio};
24use std::time::Instant;
25
26const MIN_CLAUDE_VERSION: &str = "2.1.0";
27
28const EXTRACTION_SCHEMA: &str = r#"{
29 "type": "object",
30 "properties": {
31 "name": { "type": "string" },
32 "description": { "type": "string" },
33 "entities": {
34 "type": "array",
35 "items": {
36 "type": "object",
37 "properties": {
38 "name": { "type": "string" },
39 "entity_type": {
40 "type": "string",
41 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
42 }
43 },
44 "required": ["name", "entity_type"],
45 "additionalProperties": false
46 }
47 },
48 "relationships": {
49 "type": "array",
50 "items": {
51 "type": "object",
52 "properties": {
53 "source": { "type": "string" },
54 "target": { "type": "string" },
55 "relation": {
56 "type": "string",
57 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
58 },
59 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
60 },
61 "required": ["source","target","relation","strength"],
62 "additionalProperties": false
63 }
64 }
65 },
66 "required": ["name","description","entities","relationships"],
67 "additionalProperties": false
68}"#;
69
70const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
711. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
722. A one-sentence description (10-20 words) summarizing the key insight\n\
733. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
744. Typed relationships between entities with strength scores\n\n\
75Rules:\n\
76- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
77- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
78- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
79- NEVER use 'mentions' as relationship type\n\
80- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
81- Prefer fewer high-quality entities over many low-quality ones\n\
82- Description must answer: What is this about and WHY does it matter?";
83
84#[derive(Debug, Deserialize)]
85struct ClaudeOutputElement {
86 r#type: Option<String>,
87 #[allow(dead_code)]
88 subtype: Option<String>,
89 #[serde(default)]
90 is_error: bool,
91 structured_output: Option<ExtractionResult>,
92 result: Option<String>,
93 total_cost_usd: Option<f64>,
94 error: Option<String>,
95}
96
97#[derive(Debug, Clone, Deserialize, Serialize)]
98pub struct ExtractionResult {
99 pub name: String,
100 pub description: String,
101 pub entities: Vec<ExtractedEntity>,
102 pub relationships: Vec<ExtractedRelationship>,
103}
104
105#[derive(Debug, Clone, Deserialize, Serialize)]
106pub struct ExtractedEntity {
107 pub name: String,
108 pub entity_type: String,
109}
110
111#[derive(Debug, Clone, Deserialize, Serialize)]
112pub struct ExtractedRelationship {
113 pub source: String,
114 pub target: String,
115 pub relation: String,
116 pub strength: f64,
117}
118
119#[derive(Debug, Serialize)]
120struct PhaseEvent<'a> {
121 phase: &'a str,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 claude_path: Option<&'a str>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 version: Option<&'a str>,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 dir: Option<&'a str>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 files_total: Option<usize>,
130 #[serde(skip_serializing_if = "Option::is_none")]
131 files_new: Option<usize>,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 files_existing: Option<usize>,
134}
135
136#[derive(Debug, Serialize)]
137struct FileEvent<'a> {
138 file: &'a str,
139 name: &'a str,
140 status: &'a str,
141 #[serde(skip_serializing_if = "Option::is_none")]
142 memory_id: Option<i64>,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 entities: Option<usize>,
145 #[serde(skip_serializing_if = "Option::is_none")]
146 rels: Option<usize>,
147 #[serde(skip_serializing_if = "Option::is_none")]
148 cost_usd: Option<f64>,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 elapsed_ms: Option<u64>,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 error: Option<&'a str>,
153 index: usize,
154 total: usize,
155}
156
157#[derive(Debug, Serialize)]
158struct Summary {
159 summary: bool,
160 files_total: usize,
161 completed: usize,
162 failed: usize,
163 skipped: usize,
164 entities_total: usize,
165 rels_total: usize,
166 cost_usd: f64,
167 elapsed_ms: u64,
168}
169
170pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
172 if let Some(p) = explicit {
173 if p.exists() {
174 return Ok(p.to_path_buf());
175 }
176 return Err(AppError::Validation(format!(
177 "Claude Code binary not found at explicit path: {}",
178 p.display()
179 )));
180 }
181
182 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
183 let p = PathBuf::from(&env_path);
184 if p.exists() {
185 return Ok(p);
186 }
187 }
188
189 let name = if cfg!(windows) {
190 "claude.exe"
191 } else {
192 "claude"
193 };
194 if let Some(path_var) = std::env::var_os("PATH") {
195 for dir in std::env::split_paths(&path_var) {
196 let candidate = dir.join(name);
197 if candidate.exists() {
198 return Ok(candidate);
199 }
200 }
201 }
202
203 Err(AppError::Validation(
204 "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
205 ))
206}
207
208fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
210 let output = Command::new(binary)
211 .arg("--version")
212 .stdin(Stdio::null())
213 .stdout(Stdio::piped())
214 .stderr(Stdio::piped())
215 .output()
216 .map_err(AppError::Io)?;
217
218 if !output.status.success() {
219 return Err(AppError::Validation(
220 "failed to run 'claude --version'".to_string(),
221 ));
222 }
223
224 let version_str = String::from_utf8(output.stdout)
225 .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
226 let version = version_str.trim().to_string();
227
228 let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
230
231 fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
232 let parts: Vec<&str> = s.splitn(3, '.').collect();
233 if parts.len() < 2 {
234 return None;
235 }
236 let major = parts[0].parse::<u64>().ok()?;
237 let minor = parts[1].parse::<u64>().ok()?;
238 let patch = parts
239 .get(2)
240 .and_then(|p| p.parse::<u64>().ok())
241 .unwrap_or(0);
242 Some((major, minor, patch))
243 }
244
245 if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
246 if actual < min {
247 return Err(AppError::Validation(format!(
248 "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
249 )));
250 }
251 }
252
253 Ok(version)
254}
255
256fn extract_with_claude(
263 binary: &Path,
264 file_content: &[u8],
265 model: Option<&str>,
266 timeout_secs: u64,
267) -> Result<(ExtractionResult, f64), AppError> {
268 use wait_timeout::ChildExt;
269
270 let mut cmd = Command::new(binary);
271
272 cmd.env_clear();
273 for var in &[
274 "PATH",
275 "HOME",
276 "USER",
277 "SHELL",
278 "TERM",
279 "LANG",
280 "XDG_CONFIG_HOME",
281 "XDG_DATA_HOME",
282 "XDG_RUNTIME_DIR",
283 "ANTHROPIC_API_KEY",
284 "CLAUDE_CONFIG_DIR",
285 "TMPDIR",
286 "TMP",
287 "TEMP",
288 "DYLD_FALLBACK_LIBRARY_PATH",
289 ] {
290 if let Ok(val) = std::env::var(var) {
291 cmd.env(var, val);
292 }
293 }
294
295 #[cfg(windows)]
296 for var in &[
297 "LOCALAPPDATA",
298 "APPDATA",
299 "USERPROFILE",
300 "SystemRoot",
301 "COMSPEC",
302 "PATHEXT",
303 "HOMEPATH",
304 "HOMEDRIVE",
305 ] {
306 if let Ok(val) = std::env::var(var) {
307 cmd.env(var, val);
308 }
309 }
310
311 cmd.arg("-p")
312 .arg(EXTRACTION_PROMPT)
313 .arg("--output-format")
314 .arg("json")
315 .arg("--json-schema")
316 .arg(EXTRACTION_SCHEMA)
317 .arg("--max-turns")
318 .arg("3")
319 .arg("--no-session-persistence");
320
321 if std::env::var("ANTHROPIC_API_KEY").is_ok() {
322 cmd.arg("--bare");
323 } else {
324 cmd.arg("--dangerously-skip-permissions");
325 }
326
327 if let Some(m) = model {
328 cmd.arg("--model").arg(m);
329 }
330
331 cmd.stdin(Stdio::piped())
332 .stdout(Stdio::piped())
333 .stderr(Stdio::piped());
334
335 let mut child = cmd.spawn().map_err(|e| {
336 AppError::Io(std::io::Error::new(
337 e.kind(),
338 format!("failed to spawn claude: {e}"),
339 ))
340 })?;
341
342 let stdin_data = file_content.to_vec();
343 let mut child_stdin = child
344 .stdin
345 .take()
346 .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
347 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
348 child_stdin.write_all(&stdin_data)?;
349 Ok(())
350 });
351
352 let timeout = std::time::Duration::from_secs(timeout_secs);
353 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
354
355 match status {
356 Some(exit_status) => {
357 stdin_thread
358 .join()
359 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
360 .map_err(AppError::Io)?;
361
362 let mut stdout_buf = Vec::new();
363 let mut stderr_buf = Vec::new();
364 if let Some(mut out) = child.stdout.take() {
365 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
366 }
367 if let Some(mut err) = child.stderr.take() {
368 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
369 }
370
371 if !exit_status.success() {
372 let stdout_str = String::from_utf8_lossy(&stdout_buf);
373 if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
374 {
375 if let Some(re) = elements
376 .iter()
377 .find(|e| e.r#type.as_deref() == Some("result"))
378 {
379 if re.is_error {
380 let err_msg = re
381 .error
382 .as_deref()
383 .or(re.result.as_deref())
384 .unwrap_or("unknown error");
385 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
386 return Err(AppError::Validation(format!(
387 "RATE_LIMITED: {err_msg}"
388 )));
389 }
390 if err_msg.contains("Not logged in")
391 || err_msg.contains("authentication")
392 {
393 tracing::warn!(
394 target: "ingest",
395 "Claude Code authentication failed. Re-authenticate interactively with: claude"
396 );
397 }
398 return Err(AppError::Validation(format!(
399 "claude -p failed: {err_msg}"
400 )));
401 }
402 }
403 }
404 let stderr_str = String::from_utf8_lossy(&stderr_buf);
405 if stderr_str.contains("auth") || stderr_str.contains("login") {
406 tracing::warn!(
407 target: "ingest",
408 "Claude Code authentication may have failed. Re-authenticate with: claude"
409 );
410 }
411 return Err(AppError::Validation(format!(
412 "claude -p exited with code {:?}: {}",
413 exit_status.code(),
414 stderr_str.trim()
415 )));
416 }
417
418 let stdout = String::from_utf8(stdout_buf)
419 .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
420 parse_claude_output(&stdout)
421 }
422 None => {
423 tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
424 let _ = child.kill();
425 let _ = child.wait();
426 let _ = stdin_thread.join();
427 Err(AppError::Validation(format!(
428 "claude -p timed out after {timeout_secs} seconds"
429 )))
430 }
431 }
432}
433
434fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64), AppError> {
436 let elements: Vec<ClaudeOutputElement> = serde_json::from_str(stdout).map_err(|e| {
437 AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
438 })?;
439
440 let result_elem = elements
441 .iter()
442 .find(|e| e.r#type.as_deref() == Some("result"))
443 .ok_or_else(|| {
444 AppError::Validation("claude output missing 'result' element".to_string())
445 })?;
446
447 if result_elem.is_error {
448 let err_msg = result_elem
449 .error
450 .as_deref()
451 .or(result_elem.result.as_deref())
452 .unwrap_or("unknown error");
453 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
454 return Err(AppError::Validation(format!("RATE_LIMITED: {err_msg}")));
455 }
456 return Err(AppError::Validation(format!(
457 "claude extraction failed: {err_msg}"
458 )));
459 }
460
461 let extraction = result_elem
462 .structured_output
463 .clone()
464 .or_else(|| {
465 result_elem
466 .result
467 .as_ref()
468 .and_then(|text| serde_json::from_str::<ExtractionResult>(text).ok())
469 })
470 .ok_or_else(|| {
471 AppError::Validation("claude result missing structured_output and result field".into())
472 })?;
473
474 let cost = result_elem.total_cost_usd.unwrap_or(0.0);
475
476 Ok((extraction, cost))
477}
478
479fn emit_json<T: Serialize>(value: &T) {
480 if let Ok(json) = serde_json::to_string(value) {
481 let stdout = std::io::stdout();
482 let mut lock = stdout.lock();
483 let _ = writeln!(lock, "{json}");
484 let _ = lock.flush();
485 }
486}
487
488fn collect_matching_files(
490 dir: &Path,
491 pattern: &str,
492 recursive: bool,
493 max_files: usize,
494) -> Result<Vec<PathBuf>, AppError> {
495 let mut files = Vec::new();
496 super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
497 files.sort();
498
499 if files.len() > max_files {
500 return Err(AppError::Validation(format!(
501 "found {} files, exceeds --max-files cap of {}",
502 files.len(),
503 max_files
504 )));
505 }
506
507 Ok(files)
508}
509
510fn open_queue_db(path: &str) -> Result<Connection, AppError> {
512 let conn = Connection::open(path)?;
513
514 conn.pragma_update(None, "journal_mode", "wal")?;
515
516 conn.execute_batch(
517 "CREATE TABLE IF NOT EXISTS queue (
518 id INTEGER PRIMARY KEY AUTOINCREMENT,
519 file_path TEXT NOT NULL UNIQUE,
520 name TEXT,
521 status TEXT NOT NULL DEFAULT 'pending',
522 memory_id INTEGER,
523 entities INTEGER DEFAULT 0,
524 rels INTEGER DEFAULT 0,
525 error TEXT,
526 cost_usd REAL DEFAULT 0.0,
527 attempt INTEGER DEFAULT 0,
528 elapsed_ms INTEGER,
529 created_at TEXT DEFAULT (datetime('now')),
530 done_at TEXT
531 );
532 CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
533 )?;
534
535 Ok(conn)
536}
537
538pub fn run_claude_ingest(args: &IngestArgs) -> Result<(), AppError> {
540 let started = Instant::now();
541
542 if !args.dir.exists() {
543 return Err(AppError::Validation(format!(
544 "directory not found: {}",
545 args.dir.display()
546 )));
547 }
548
549 let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
551 let version = validate_claude_version(&claude_binary)?;
552 tracing::info!(
553 target: "ingest",
554 binary = %claude_binary.display(),
555 version = %version,
556 "Claude Code binary validated"
557 );
558
559 emit_json(&PhaseEvent {
560 phase: "validate",
561 claude_path: claude_binary.to_str(),
562 version: Some(&version),
563 dir: None,
564 files_total: None,
565 files_new: None,
566 files_existing: None,
567 });
568
569 let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
571
572 let queue_conn = open_queue_db(&args.queue_db)?;
573
574 if args.resume {
575 let reset = queue_conn
576 .execute(
577 "UPDATE queue SET status='pending' WHERE status='processing'",
578 [],
579 )
580 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
581 if reset > 0 {
582 tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
583 }
584 }
585
586 if args.retry_failed {
587 let count = queue_conn
588 .execute(
589 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
590 [],
591 )
592 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
593 tracing::info!(target: "ingest", count, "retrying failed files");
594 }
595
596 if !args.resume && !args.retry_failed {
597 queue_conn
598 .execute("DELETE FROM queue", [])
599 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
600 }
601
602 let mut new_count = 0usize;
603 let mut existing_count = 0usize;
604
605 if !args.retry_failed {
606 for file in &files {
607 let file_str = file.to_string_lossy().to_string();
608 let inserted = queue_conn
609 .execute(
610 "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
611 rusqlite::params![file_str],
612 )
613 .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
614 if inserted > 0 {
615 new_count += 1;
616 } else {
617 existing_count += 1;
618 }
619 }
620 }
621
622 emit_json(&PhaseEvent {
623 phase: "scan",
624 claude_path: None,
625 version: None,
626 dir: args.dir.to_str(),
627 files_total: Some(files.len()),
628 files_new: Some(new_count),
629 files_existing: Some(existing_count),
630 });
631
632 if args.dry_run {
633 for (idx, file) in files.iter().enumerate() {
634 let (name, _truncated, _orig) =
635 super::ingest::derive_kebab_name(file, args.max_name_length);
636 emit_json(&FileEvent {
637 file: &file.to_string_lossy(),
638 name: &name,
639 status: "preview",
640 memory_id: None,
641 entities: None,
642 rels: None,
643 cost_usd: None,
644 elapsed_ms: None,
645 error: None,
646 index: idx,
647 total: files.len(),
648 });
649 }
650 emit_json(&Summary {
651 summary: true,
652 files_total: files.len(),
653 completed: 0,
654 failed: 0,
655 skipped: 0,
656 entities_total: 0,
657 rels_total: 0,
658 cost_usd: 0.0,
659 elapsed_ms: started.elapsed().as_millis() as u64,
660 });
661 if !args.keep_queue {
662 let _ = std::fs::remove_file(&args.queue_db);
663 }
664 return Ok(());
665 }
666
667 let paths = AppPaths::resolve(args.db.as_deref())?;
669 ensure_db_ready(&paths)?;
670 let conn = open_rw(&paths.db)?;
671 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
672 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
673 let memory_type_str = args.r#type.as_str().to_string();
674
675 let mut completed = 0usize;
676 let mut failed = 0usize;
677 let skipped_initial: usize = queue_conn
678 .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
679 r.get::<_, usize>(0)
680 })
681 .unwrap_or(0);
682 let skipped = skipped_initial;
683 let mut entities_total = 0usize;
684 let mut rels_total = 0usize;
685 let mut cost_total = 0.0f64;
686 let total = files.len();
687
688 let mut backoff_secs = args.rate_limit_wait;
689
690 loop {
691 let pending: Option<(i64, String)> = queue_conn
692 .query_row(
693 "UPDATE queue SET status='processing', attempt=attempt+1 \
694 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
695 RETURNING id, file_path",
696 [],
697 |row| Ok((row.get(0)?, row.get(1)?)),
698 )
699 .ok();
700
701 let (queue_id, file_path) = match pending {
702 Some(p) => p,
703 None => break,
704 };
705
706 let file_started = Instant::now();
707
708 const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
710 if let Ok(meta) = std::fs::metadata(&file_path) {
711 if meta.len() > MAX_FILE_SIZE {
712 let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
713 let _ = queue_conn.execute(
714 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
715 rusqlite::params![err_msg, queue_id],
716 );
717 let current_index = completed + failed + skipped;
718 failed += 1;
719 emit_json(&FileEvent {
720 file: &file_path,
721 name: "",
722 status: "failed",
723 memory_id: None,
724 entities: None,
725 rels: None,
726 cost_usd: None,
727 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
728 error: Some(&err_msg),
729 index: current_index,
730 total,
731 });
732 if args.fail_fast {
733 break;
734 }
735 continue;
736 }
737 }
738
739 let file_content = match std::fs::read(&file_path) {
740 Ok(c) => c,
741 Err(e) => {
742 let err_msg = format!("IO error: {e}");
743 let _ = queue_conn.execute(
744 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
745 rusqlite::params![err_msg, queue_id],
746 );
747 let current_index = completed + failed + skipped;
748 failed += 1;
749 emit_json(&FileEvent {
750 file: &file_path,
751 name: "",
752 status: "failed",
753 memory_id: None,
754 entities: None,
755 rels: None,
756 cost_usd: None,
757 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
758 error: Some(&err_msg),
759 index: current_index,
760 total,
761 });
762 if args.fail_fast {
763 break;
764 }
765 continue;
766 }
767 };
768
769 let max_extract_attempts: u32 = 2;
771 let mut extraction_result: Option<(ExtractionResult, f64)> = None;
772 let mut last_extract_err: Option<String> = None;
773
774 for attempt in 1..=max_extract_attempts {
775 match extract_with_claude(
776 &claude_binary,
777 &file_content,
778 args.claude_model.as_deref(),
779 args.claude_timeout,
780 ) {
781 Ok(result) => {
782 extraction_result = Some(result);
783 break;
784 }
785 Err(ref e) if format!("{e}").contains("RATE_LIMITED") => {
786 last_extract_err = Some(format!("{e}"));
787 break;
788 }
789 Err(e) => {
790 let msg = format!("{e}");
791 if attempt < max_extract_attempts {
792 tracing::warn!(target: "ingest", attempt, error = %msg, "extraction failed, retrying (cold-start workaround)");
793 std::thread::sleep(std::time::Duration::from_secs(2));
794 }
795 last_extract_err = Some(msg);
796 }
797 }
798 }
799
800 if let Some((extraction, cost)) = extraction_result {
801 backoff_secs = args.rate_limit_wait;
802
803 let (normalized_name, _truncated, _orig) = crate::commands::ingest::derive_kebab_name(
804 std::path::Path::new(&extraction.name),
805 args.max_name_length,
806 );
807 let name = &normalized_name;
808 let ent_count = extraction.entities.len();
809 let rel_count = extraction.relationships.len();
810
811 let new_entities: Vec<NewEntity> = extraction
812 .entities
813 .iter()
814 .filter_map(|e| match e.entity_type.parse::<EntityType>() {
815 Ok(et) => Some(NewEntity {
816 name: e.name.clone(),
817 entity_type: et,
818 description: None,
819 }),
820 Err(_) => {
821 tracing::warn!(
822 target: "ingest",
823 entity = %e.name,
824 entity_type = %e.entity_type,
825 "entity type not recognized, skipping"
826 );
827 None
828 }
829 })
830 .collect();
831
832 let new_relationships: Vec<NewRelationship> = extraction
833 .relationships
834 .iter()
835 .map(|r| NewRelationship {
836 source: r.source.clone(),
837 target: r.target.clone(),
838 relation: crate::parsers::normalize_relation(&r.relation),
839 strength: r.strength,
840 description: None,
841 })
842 .collect();
843
844 let body_str = String::from_utf8_lossy(&file_content);
845 let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
846 let new_memory = NewMemory {
847 name: name.clone(),
848 namespace: namespace.clone(),
849 memory_type: memory_type_str.clone(),
850 description: extraction.description.clone(),
851 body: body_str.to_string(),
852 body_hash,
853 session_id: None,
854 source: "agent".to_string(),
855 metadata: serde_json::Value::Object(serde_json::Map::new()),
856 };
857
858 let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
860 Some((existing_id, is_deleted)) => {
861 if is_deleted {
862 memories::clear_deleted_at(&conn, existing_id)?;
863 }
864 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
865 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
866 rusqlite::params![existing_id],
867 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
868 )?;
869 memories::update(&conn, existing_id, &new_memory, None)?;
870 memories::sync_fts_after_update(
871 &conn,
872 existing_id,
873 &old_name,
874 &old_desc,
875 &old_body,
876 &new_memory.name,
877 &new_memory.description,
878 &new_memory.body,
879 )?;
880 tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
881 existing_id
882 }
883 None => match memories::insert(&conn, &new_memory) {
884 Ok(id) => id,
885 Err(e) => {
886 let err_msg = format!("{e}");
887 let _ = queue_conn.execute(
888 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
889 rusqlite::params![err_msg, queue_id],
890 );
891 let current_index = completed + failed + skipped;
892 failed += 1;
893 emit_json(&FileEvent {
894 file: &file_path,
895 name,
896 status: "failed",
897 memory_id: None,
898 entities: None,
899 rels: None,
900 cost_usd: Some(cost),
901 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
902 error: Some(&err_msg),
903 index: current_index,
904 total,
905 });
906 cost_total += cost;
907 if args.fail_fast {
908 break;
909 }
910 continue;
911 }
912 },
913 };
914
915 for ent in &new_entities {
916 match entities::upsert_entity(&conn, &namespace, ent) {
917 Ok(eid) => {
918 let _ = entities::link_memory_entity(&conn, memory_id, eid);
919 }
920 Err(e) => {
921 tracing::warn!(
922 target: "ingest",
923 entity = %ent.name,
924 error = %e,
925 "entity skipped due to validation error"
926 );
927 }
928 }
929 }
930 for rel in &new_relationships {
931 crate::parsers::warn_if_non_canonical(&rel.relation);
932 let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
933 let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
934 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
935 let _ = conn.execute(
936 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
937 rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
938 );
939 }
940 }
941
942 let body_text = String::from_utf8_lossy(&file_content).into_owned();
944 let snippet: String = body_text.chars().take(200).collect();
945 let chunks_info =
946 crate::chunking::split_into_chunks_hierarchical(&body_text, tokenizer);
947
948 let embedding_result = if chunks_info.len() <= 1 {
949 crate::daemon::embed_passage_or_local(&paths.models, &body_text)
950 } else {
951 let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
952 let mut multi_ok = true;
953 for chunk in &chunks_info {
954 let chunk_text = crate::chunking::chunk_text(&body_text, chunk);
955 match crate::daemon::embed_passage_or_local(&paths.models, chunk_text) {
956 Ok(emb) => chunk_embeddings.push(emb),
957 Err(e) => {
958 tracing::warn!(
959 target: "ingest",
960 file = %file_path,
961 error = %e,
962 "chunk embedding failed, skipping vector index for this file"
963 );
964 multi_ok = false;
965 break;
966 }
967 }
968 }
969 if multi_ok {
970 let aggregated = crate::chunking::aggregate_embeddings(&chunk_embeddings);
971 if let Err(e) = crate::storage::chunks::insert_chunk_slices(
973 &conn,
974 memory_id,
975 &body_text,
976 &chunks_info,
977 ) {
978 tracing::warn!(
979 target: "ingest",
980 file = %file_path,
981 error = %e,
982 "chunk slice insert failed"
983 );
984 } else {
985 for (i, emb) in chunk_embeddings.iter().enumerate() {
986 if let Err(e) = crate::storage::chunks::upsert_chunk_vec(
987 &conn, i as i64, memory_id, i as i32, emb,
988 ) {
989 tracing::warn!(
990 target: "ingest",
991 file = %file_path,
992 chunk = i,
993 error = %e,
994 "chunk vec upsert failed"
995 );
996 }
997 }
998 }
999 Ok(aggregated)
1000 } else {
1001 crate::daemon::embed_passage_or_local(&paths.models, &body_text)
1003 }
1004 };
1005
1006 match embedding_result {
1007 Ok(embedding) => {
1008 if let Err(e) = memories::upsert_vec(
1009 &conn,
1010 memory_id,
1011 &namespace,
1012 &memory_type_str,
1013 &embedding,
1014 name,
1015 &snippet,
1016 ) {
1017 tracing::warn!(
1018 target: "ingest",
1019 file = %file_path,
1020 error = %e,
1021 "memory vec upsert failed; recall may not find this memory"
1022 );
1023 }
1024 for ent in &new_entities {
1026 if let Ok(Some(eid)) =
1027 entities::find_entity_id(&conn, &namespace, &ent.name)
1028 {
1029 let entity_text = ent.name.clone();
1030 match crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
1031 {
1032 Ok(emb) => {
1033 if let Err(e) = entities::upsert_entity_vec(
1034 &conn,
1035 eid,
1036 &namespace,
1037 ent.entity_type,
1038 &emb,
1039 &ent.name,
1040 ) {
1041 tracing::warn!(
1042 target: "ingest",
1043 entity = %ent.name,
1044 error = %e,
1045 "entity vec upsert failed"
1046 );
1047 }
1048 }
1049 Err(e) => {
1050 tracing::warn!(
1051 target: "ingest",
1052 entity = %ent.name,
1053 error = %e,
1054 "entity embedding failed"
1055 );
1056 }
1057 }
1058 }
1059 }
1060 }
1061 Err(e) => {
1062 tracing::warn!(
1063 target: "ingest",
1064 file = %file_path,
1065 error = %e,
1066 "memory embedding failed; recall will not find this memory"
1067 );
1068 }
1069 }
1070
1071 let _ = queue_conn.execute(
1072 "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1073 rusqlite::params![
1074 name,
1075 memory_id,
1076 ent_count,
1077 rel_count,
1078 cost,
1079 file_started.elapsed().as_millis() as i64,
1080 queue_id
1081 ],
1082 );
1083
1084 let current_index = completed + failed + skipped;
1085 completed += 1;
1086 entities_total += ent_count;
1087 rels_total += rel_count;
1088 cost_total += cost;
1089
1090 emit_json(&FileEvent {
1091 file: &file_path,
1092 name,
1093 status: "done",
1094 memory_id: Some(memory_id),
1095 entities: Some(ent_count),
1096 rels: Some(rel_count),
1097 cost_usd: Some(cost),
1098 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1099 error: None,
1100 index: current_index,
1101 total,
1102 });
1103 } else if let Some(ref err_str) = last_extract_err {
1104 if err_str.contains("RATE_LIMITED") {
1105 tracing::warn!(
1106 target: "ingest",
1107 wait_seconds = backoff_secs,
1108 "rate limited, waiting before retry"
1109 );
1110 let _ = queue_conn.execute(
1111 "UPDATE queue SET status='pending' WHERE id=?1",
1112 rusqlite::params![queue_id],
1113 );
1114 std::thread::sleep(std::time::Duration::from_secs(backoff_secs));
1115 backoff_secs = (backoff_secs * 2).min(900);
1116 continue;
1117 } else {
1118 let _ = queue_conn.execute(
1119 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1120 rusqlite::params![err_str, queue_id],
1121 );
1122 let current_index = completed + failed + skipped;
1123 failed += 1;
1124 emit_json(&FileEvent {
1125 file: &file_path,
1126 name: "",
1127 status: "failed",
1128 memory_id: None,
1129 entities: None,
1130 rels: None,
1131 cost_usd: None,
1132 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1133 error: Some(err_str),
1134 index: current_index,
1135 total,
1136 });
1137 if args.fail_fast {
1138 break;
1139 }
1140 }
1141 }
1142
1143 if let Some(budget) = args.max_cost_usd {
1144 if cost_total >= budget {
1145 tracing::warn!(
1146 target: "ingest",
1147 spent = cost_total,
1148 budget = budget,
1149 "budget exceeded, stopping"
1150 );
1151 break;
1152 }
1153 }
1154 }
1155
1156 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1158 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1159
1160 emit_json(&Summary {
1161 summary: true,
1162 files_total: total,
1163 completed,
1164 failed,
1165 skipped,
1166 entities_total,
1167 rels_total,
1168 cost_usd: cost_total,
1169 elapsed_ms: started.elapsed().as_millis() as u64,
1170 });
1171
1172 if !args.keep_queue && failed == 0 {
1173 let _ = std::fs::remove_file(&args.queue_db);
1174 }
1175
1176 Ok(())
1177}
1178
1179#[cfg(test)]
1180mod tests {
1181 use super::*;
1182
1183 #[test]
1184 fn test_extraction_schema_valid_json() {
1185 let _: serde_json::Value =
1186 serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
1187 }
1188
1189 #[test]
1190 fn test_parse_claude_output_valid() {
1191 let output = r#"[
1192 {"type":"system","subtype":"init"},
1193 {"type":"assistant"},
1194 {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
1195 ]"#;
1196 let (result, cost) = parse_claude_output(output).expect("parse must succeed");
1197 assert_eq!(result.name, "test-doc");
1198 assert_eq!(result.entities.len(), 1);
1199 assert_eq!(result.relationships.len(), 1);
1200 assert!((cost - 0.02).abs() < f64::EPSILON);
1201 }
1202
1203 #[test]
1204 fn test_parse_claude_output_error() {
1205 let output = r#"[
1206 {"type":"system","subtype":"init"},
1207 {"type":"result","is_error":true,"error":"authentication failed"}
1208 ]"#;
1209 let err = parse_claude_output(output).unwrap_err();
1210 assert!(format!("{err}").contains("authentication failed"));
1211 }
1212
1213 #[test]
1214 fn test_parse_claude_output_rate_limit() {
1215 let output = r#"[
1216 {"type":"system","subtype":"init"},
1217 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
1218 ]"#;
1219 let err = parse_claude_output(output).unwrap_err();
1220 assert!(format!("{err}").contains("RATE_LIMITED"));
1221 }
1222
1223 #[test]
1224 fn test_parse_claude_output_malformed() {
1225 let output = "not json at all";
1226 assert!(parse_claude_output(output).is_err());
1227 }
1228
1229 #[test]
1230 fn test_find_claude_binary_not_found() {
1231 let original_path = std::env::var_os("PATH");
1232 std::env::set_var("PATH", "/nonexistent");
1233 std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
1234 let result = find_claude_binary(None);
1235 if let Some(p) = original_path {
1236 std::env::set_var("PATH", p);
1237 }
1238 assert!(result.is_err());
1239 }
1240
1241 #[test]
1242 fn test_parse_claude_output_result_fallback() {
1243 let output = r#"[
1244 {"type":"system","subtype":"init"},
1245 {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1246 ]"#;
1247 let (result, cost) = parse_claude_output(output).expect("result fallback must work");
1248 assert_eq!(result.name, "test-fallback");
1249 assert_eq!(result.entities.len(), 1);
1250 assert!(result.relationships.is_empty());
1251 assert!((cost - 0.01).abs() < f64::EPSILON);
1252 }
1253
1254 #[test]
1255 fn test_parse_claude_output_error_with_result_field() {
1256 let output = r#"[
1257 {"type":"system","subtype":"init"},
1258 {"type":"result","is_error":true,"result":"Not logged in ยท Please run /login"}
1259 ]"#;
1260 let err = parse_claude_output(output).unwrap_err();
1261 let msg = format!("{err}");
1262 assert!(
1263 msg.contains("Not logged in"),
1264 "expected 'Not logged in' in: {msg}"
1265 );
1266 }
1267
1268 #[test]
1269 fn test_extraction_schema_entity_types_match_enum() {
1270 let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1271 let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1272 .as_array()
1273 .expect("schema must have entity_type enum");
1274 for t in types {
1275 let s = t.as_str().unwrap();
1276 assert!(
1277 s.parse::<EntityType>().is_ok(),
1278 "schema entity_type '{s}' not in EntityType enum"
1279 );
1280 }
1281 }
1282}