1use crate::commands::ingest::IngestArgs;
12use crate::entity_type::EntityType;
13use crate::errors::AppError;
14use crate::paths::AppPaths;
15use crate::storage::connection::{ensure_db_ready, open_rw};
16use crate::storage::entities::{self, NewEntity, NewRelationship};
17use crate::storage::memories::{self, NewMemory};
18
19use rusqlite::Connection;
20use serde::{Deserialize, Serialize};
21use std::io::Write;
22use std::path::{Path, PathBuf};
23use std::process::{Command, Stdio};
24use std::time::Instant;
25
26const MIN_CLAUDE_VERSION: &str = "2.1.0";
27
28const EXTRACTION_SCHEMA: &str = r#"{
29 "type": "object",
30 "properties": {
31 "name": { "type": "string" },
32 "description": { "type": "string" },
33 "entities": {
34 "type": "array",
35 "items": {
36 "type": "object",
37 "properties": {
38 "name": { "type": "string" },
39 "entity_type": {
40 "type": "string",
41 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
42 }
43 },
44 "required": ["name", "entity_type"],
45 "additionalProperties": false
46 }
47 },
48 "relationships": {
49 "type": "array",
50 "items": {
51 "type": "object",
52 "properties": {
53 "source": { "type": "string" },
54 "target": { "type": "string" },
55 "relation": {
56 "type": "string",
57 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
58 },
59 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
60 },
61 "required": ["source","target","relation","strength"],
62 "additionalProperties": false
63 }
64 }
65 },
66 "required": ["name","description","entities","relationships"],
67 "additionalProperties": false
68}"#;
69
70const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
711. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
722. A one-sentence description (10-20 words) summarizing the key insight\n\
733. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
744. Typed relationships between entities with strength scores\n\n\
75Rules:\n\
76- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
77- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
78- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
79- NEVER use 'mentions' as relationship type\n\
80- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
81- Prefer fewer high-quality entities over many low-quality ones\n\
82- Description must answer: What is this about and WHY does it matter?";
83
84#[derive(Debug, Deserialize)]
85struct ClaudeOutputElement {
86 r#type: Option<String>,
87 #[allow(dead_code)]
88 subtype: Option<String>,
89 #[serde(default)]
90 is_error: bool,
91 structured_output: Option<ExtractionResult>,
92 result: Option<String>,
93 total_cost_usd: Option<f64>,
94 error: Option<String>,
95}
96
97#[derive(Debug, Clone, Deserialize, Serialize)]
98pub struct ExtractionResult {
99 pub name: String,
100 pub description: String,
101 pub entities: Vec<ExtractedEntity>,
102 pub relationships: Vec<ExtractedRelationship>,
103}
104
105#[derive(Debug, Clone, Deserialize, Serialize)]
106pub struct ExtractedEntity {
107 pub name: String,
108 pub entity_type: String,
109}
110
111#[derive(Debug, Clone, Deserialize, Serialize)]
112pub struct ExtractedRelationship {
113 pub source: String,
114 pub target: String,
115 pub relation: String,
116 pub strength: f64,
117}
118
119#[derive(Debug, Serialize)]
120struct PhaseEvent<'a> {
121 phase: &'a str,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 claude_path: Option<&'a str>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 version: Option<&'a str>,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 dir: Option<&'a str>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 files_total: Option<usize>,
130 #[serde(skip_serializing_if = "Option::is_none")]
131 files_new: Option<usize>,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 files_existing: Option<usize>,
134}
135
136#[derive(Debug, Serialize)]
137struct FileEvent<'a> {
138 file: &'a str,
139 name: &'a str,
140 status: &'a str,
141 #[serde(skip_serializing_if = "Option::is_none")]
142 memory_id: Option<i64>,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 entities: Option<usize>,
145 #[serde(skip_serializing_if = "Option::is_none")]
146 rels: Option<usize>,
147 #[serde(skip_serializing_if = "Option::is_none")]
148 cost_usd: Option<f64>,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 elapsed_ms: Option<u64>,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 error: Option<&'a str>,
153 index: usize,
154 total: usize,
155}
156
157#[derive(Debug, Serialize)]
158struct Summary {
159 summary: bool,
160 files_total: usize,
161 completed: usize,
162 failed: usize,
163 skipped: usize,
164 entities_total: usize,
165 rels_total: usize,
166 cost_usd: f64,
167 elapsed_ms: u64,
168}
169
170pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
172 if let Some(p) = explicit {
173 if p.exists() {
174 return Ok(p.to_path_buf());
175 }
176 return Err(AppError::Validation(format!(
177 "Claude Code binary not found at explicit path: {}",
178 p.display()
179 )));
180 }
181
182 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
183 let p = PathBuf::from(&env_path);
184 if p.exists() {
185 return Ok(p);
186 }
187 }
188
189 let name = if cfg!(windows) {
190 "claude.exe"
191 } else {
192 "claude"
193 };
194 if let Some(path_var) = std::env::var_os("PATH") {
195 for dir in std::env::split_paths(&path_var) {
196 let candidate = dir.join(name);
197 if candidate.exists() {
198 return Ok(candidate);
199 }
200 }
201 }
202
203 Err(AppError::Validation(
204 "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
205 ))
206}
207
208fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
210 let output = Command::new(binary)
211 .arg("--version")
212 .stdin(Stdio::null())
213 .stdout(Stdio::piped())
214 .stderr(Stdio::piped())
215 .output()
216 .map_err(AppError::Io)?;
217
218 if !output.status.success() {
219 return Err(AppError::Validation(
220 "failed to run 'claude --version'".to_string(),
221 ));
222 }
223
224 let version_str = String::from_utf8(output.stdout)
225 .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
226 let version = version_str.trim().to_string();
227
228 let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
230
231 fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
232 let parts: Vec<&str> = s.splitn(3, '.').collect();
233 if parts.len() < 2 {
234 return None;
235 }
236 let major = parts[0].parse::<u64>().ok()?;
237 let minor = parts[1].parse::<u64>().ok()?;
238 let patch = parts
239 .get(2)
240 .and_then(|p| p.parse::<u64>().ok())
241 .unwrap_or(0);
242 Some((major, minor, patch))
243 }
244
245 if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
246 if actual < min {
247 return Err(AppError::Validation(format!(
248 "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
249 )));
250 }
251 }
252
253 Ok(version)
254}
255
256fn extract_with_claude(
263 binary: &Path,
264 file_content: &[u8],
265 model: Option<&str>,
266 timeout_secs: u64,
267) -> Result<(ExtractionResult, f64), AppError> {
268 use wait_timeout::ChildExt;
269
270 let mut cmd = Command::new(binary);
271
272 cmd.env_clear();
273 for var in &[
274 "PATH",
275 "HOME",
276 "USER",
277 "SHELL",
278 "TERM",
279 "LANG",
280 "XDG_CONFIG_HOME",
281 "XDG_DATA_HOME",
282 "XDG_RUNTIME_DIR",
283 "ANTHROPIC_API_KEY",
284 "CLAUDE_CONFIG_DIR",
285 "TMPDIR",
286 "TMP",
287 "TEMP",
288 "DYLD_FALLBACK_LIBRARY_PATH",
289 ] {
290 if let Ok(val) = std::env::var(var) {
291 cmd.env(var, val);
292 }
293 }
294
295 #[cfg(windows)]
296 for var in &[
297 "LOCALAPPDATA",
298 "APPDATA",
299 "USERPROFILE",
300 "SystemRoot",
301 "COMSPEC",
302 "PATHEXT",
303 "HOMEPATH",
304 "HOMEDRIVE",
305 ] {
306 if let Ok(val) = std::env::var(var) {
307 cmd.env(var, val);
308 }
309 }
310
311 cmd.arg("-p")
312 .arg(EXTRACTION_PROMPT)
313 .arg("--output-format")
314 .arg("json")
315 .arg("--json-schema")
316 .arg(EXTRACTION_SCHEMA)
317 .arg("--max-turns")
318 .arg("3")
319 .arg("--no-session-persistence");
320
321 if std::env::var("ANTHROPIC_API_KEY").is_ok() {
322 cmd.arg("--bare");
323 } else {
324 cmd.arg("--dangerously-skip-permissions");
325 }
326
327 if let Some(m) = model {
328 cmd.arg("--model").arg(m);
329 }
330
331 cmd.stdin(Stdio::piped())
332 .stdout(Stdio::piped())
333 .stderr(Stdio::piped());
334
335 let mut child = cmd.spawn().map_err(|e| {
336 AppError::Io(std::io::Error::new(
337 e.kind(),
338 format!("failed to spawn claude: {e}"),
339 ))
340 })?;
341
342 let stdin_data = file_content.to_vec();
343 let mut child_stdin = child
344 .stdin
345 .take()
346 .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
347 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
348 child_stdin.write_all(&stdin_data)?;
349 Ok(())
350 });
351
352 let timeout = std::time::Duration::from_secs(timeout_secs);
353 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
354
355 match status {
356 Some(exit_status) => {
357 stdin_thread
358 .join()
359 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
360 .map_err(AppError::Io)?;
361
362 let mut stdout_buf = Vec::new();
363 let mut stderr_buf = Vec::new();
364 if let Some(mut out) = child.stdout.take() {
365 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
366 }
367 if let Some(mut err) = child.stderr.take() {
368 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
369 }
370
371 if !exit_status.success() {
372 let stdout_str = String::from_utf8_lossy(&stdout_buf);
373 if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
374 {
375 if let Some(re) = elements
376 .iter()
377 .find(|e| e.r#type.as_deref() == Some("result"))
378 {
379 if re.is_error {
380 let err_msg = re
381 .error
382 .as_deref()
383 .or(re.result.as_deref())
384 .unwrap_or("unknown error");
385 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
386 return Err(AppError::Validation(format!(
387 "RATE_LIMITED: {err_msg}"
388 )));
389 }
390 return Err(AppError::Validation(format!(
391 "claude -p failed: {err_msg}"
392 )));
393 }
394 }
395 }
396 let stderr_str = String::from_utf8_lossy(&stderr_buf);
397 return Err(AppError::Validation(format!(
398 "claude -p exited with code {:?}: {}",
399 exit_status.code(),
400 stderr_str.trim()
401 )));
402 }
403
404 let stdout = String::from_utf8(stdout_buf)
405 .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
406 parse_claude_output(&stdout)
407 }
408 None => {
409 tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
410 let _ = child.kill();
411 let _ = child.wait();
412 let _ = stdin_thread.join();
413 Err(AppError::Validation(format!(
414 "claude -p timed out after {timeout_secs} seconds"
415 )))
416 }
417 }
418}
419
420fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64), AppError> {
422 let elements: Vec<ClaudeOutputElement> = serde_json::from_str(stdout).map_err(|e| {
423 AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
424 })?;
425
426 let result_elem = elements
427 .iter()
428 .find(|e| e.r#type.as_deref() == Some("result"))
429 .ok_or_else(|| {
430 AppError::Validation("claude output missing 'result' element".to_string())
431 })?;
432
433 if result_elem.is_error {
434 let err_msg = result_elem
435 .error
436 .as_deref()
437 .or(result_elem.result.as_deref())
438 .unwrap_or("unknown error");
439 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
440 return Err(AppError::Validation(format!("RATE_LIMITED: {err_msg}")));
441 }
442 return Err(AppError::Validation(format!(
443 "claude extraction failed: {err_msg}"
444 )));
445 }
446
447 let extraction = result_elem
448 .structured_output
449 .clone()
450 .or_else(|| {
451 result_elem
452 .result
453 .as_ref()
454 .and_then(|text| serde_json::from_str::<ExtractionResult>(text).ok())
455 })
456 .ok_or_else(|| {
457 AppError::Validation("claude result missing structured_output and result field".into())
458 })?;
459
460 let cost = result_elem.total_cost_usd.unwrap_or(0.0);
461
462 Ok((extraction, cost))
463}
464
465fn emit_json<T: Serialize>(value: &T) {
466 if let Ok(json) = serde_json::to_string(value) {
467 let stdout = std::io::stdout();
468 let mut lock = stdout.lock();
469 let _ = writeln!(lock, "{json}");
470 let _ = lock.flush();
471 }
472}
473
474fn collect_matching_files(
476 dir: &Path,
477 pattern: &str,
478 recursive: bool,
479 max_files: usize,
480) -> Result<Vec<PathBuf>, AppError> {
481 let mut files = Vec::new();
482 super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
483 files.sort();
484
485 if files.len() > max_files {
486 return Err(AppError::Validation(format!(
487 "found {} files, exceeds --max-files cap of {}",
488 files.len(),
489 max_files
490 )));
491 }
492
493 Ok(files)
494}
495
496fn open_queue_db(path: &str) -> Result<Connection, AppError> {
498 let conn = Connection::open(path)?;
499
500 conn.pragma_update(None, "journal_mode", "wal")?;
501
502 conn.execute_batch(
503 "CREATE TABLE IF NOT EXISTS queue (
504 id INTEGER PRIMARY KEY AUTOINCREMENT,
505 file_path TEXT NOT NULL UNIQUE,
506 name TEXT,
507 status TEXT NOT NULL DEFAULT 'pending',
508 memory_id INTEGER,
509 entities INTEGER DEFAULT 0,
510 rels INTEGER DEFAULT 0,
511 error TEXT,
512 cost_usd REAL DEFAULT 0.0,
513 attempt INTEGER DEFAULT 0,
514 elapsed_ms INTEGER,
515 created_at TEXT DEFAULT (datetime('now')),
516 done_at TEXT
517 );
518 CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
519 )?;
520
521 Ok(conn)
522}
523
524pub fn run_claude_ingest(args: &IngestArgs) -> Result<(), AppError> {
526 let started = Instant::now();
527
528 if !args.dir.exists() {
529 return Err(AppError::Validation(format!(
530 "directory not found: {}",
531 args.dir.display()
532 )));
533 }
534
535 let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
537 let version = validate_claude_version(&claude_binary)?;
538 tracing::info!(
539 target: "ingest",
540 binary = %claude_binary.display(),
541 version = %version,
542 "Claude Code binary validated"
543 );
544
545 emit_json(&PhaseEvent {
546 phase: "validate",
547 claude_path: claude_binary.to_str(),
548 version: Some(&version),
549 dir: None,
550 files_total: None,
551 files_new: None,
552 files_existing: None,
553 });
554
555 let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
557
558 let queue_conn = open_queue_db(&args.queue_db)?;
559
560 if args.resume {
561 let reset = queue_conn
562 .execute(
563 "UPDATE queue SET status='pending' WHERE status='processing'",
564 [],
565 )
566 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
567 if reset > 0 {
568 tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
569 }
570 }
571
572 if args.retry_failed {
573 let count = queue_conn
574 .execute(
575 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
576 [],
577 )
578 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
579 tracing::info!(target: "ingest", count, "retrying failed files");
580 }
581
582 if !args.resume && !args.retry_failed {
583 queue_conn
584 .execute("DELETE FROM queue", [])
585 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
586 }
587
588 let mut new_count = 0usize;
589 let mut existing_count = 0usize;
590
591 if !args.retry_failed {
592 for file in &files {
593 let file_str = file.to_string_lossy().to_string();
594 let inserted = queue_conn
595 .execute(
596 "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
597 rusqlite::params![file_str],
598 )
599 .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
600 if inserted > 0 {
601 new_count += 1;
602 } else {
603 existing_count += 1;
604 }
605 }
606 }
607
608 emit_json(&PhaseEvent {
609 phase: "scan",
610 claude_path: None,
611 version: None,
612 dir: args.dir.to_str(),
613 files_total: Some(files.len()),
614 files_new: Some(new_count),
615 files_existing: Some(existing_count),
616 });
617
618 if args.dry_run {
619 for (idx, file) in files.iter().enumerate() {
620 let (name, _truncated, _orig) =
621 super::ingest::derive_kebab_name(file, args.max_name_length);
622 emit_json(&FileEvent {
623 file: &file.to_string_lossy(),
624 name: &name,
625 status: "preview",
626 memory_id: None,
627 entities: None,
628 rels: None,
629 cost_usd: None,
630 elapsed_ms: None,
631 error: None,
632 index: idx,
633 total: files.len(),
634 });
635 }
636 emit_json(&Summary {
637 summary: true,
638 files_total: files.len(),
639 completed: 0,
640 failed: 0,
641 skipped: 0,
642 entities_total: 0,
643 rels_total: 0,
644 cost_usd: 0.0,
645 elapsed_ms: started.elapsed().as_millis() as u64,
646 });
647 if !args.keep_queue {
648 let _ = std::fs::remove_file(&args.queue_db);
649 }
650 return Ok(());
651 }
652
653 let paths = AppPaths::resolve(args.db.as_deref())?;
655 ensure_db_ready(&paths)?;
656 let conn = open_rw(&paths.db)?;
657 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
658 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
659 let memory_type_str = args.r#type.as_str().to_string();
660
661 let mut completed = 0usize;
662 let mut failed = 0usize;
663 let skipped_initial: usize = queue_conn
664 .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
665 r.get::<_, usize>(0)
666 })
667 .unwrap_or(0);
668 let skipped = skipped_initial;
669 let mut entities_total = 0usize;
670 let mut rels_total = 0usize;
671 let mut cost_total = 0.0f64;
672 let total = files.len();
673
674 let mut backoff_secs = args.rate_limit_wait;
675
676 loop {
677 let pending: Option<(i64, String)> = queue_conn
678 .query_row(
679 "UPDATE queue SET status='processing', attempt=attempt+1 \
680 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
681 RETURNING id, file_path",
682 [],
683 |row| Ok((row.get(0)?, row.get(1)?)),
684 )
685 .ok();
686
687 let (queue_id, file_path) = match pending {
688 Some(p) => p,
689 None => break,
690 };
691
692 let file_started = Instant::now();
693
694 const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
696 if let Ok(meta) = std::fs::metadata(&file_path) {
697 if meta.len() > MAX_FILE_SIZE {
698 let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
699 let _ = queue_conn.execute(
700 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
701 rusqlite::params![err_msg, queue_id],
702 );
703 let current_index = completed + failed + skipped;
704 failed += 1;
705 emit_json(&FileEvent {
706 file: &file_path,
707 name: "",
708 status: "failed",
709 memory_id: None,
710 entities: None,
711 rels: None,
712 cost_usd: None,
713 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
714 error: Some(&err_msg),
715 index: current_index,
716 total,
717 });
718 if args.fail_fast {
719 break;
720 }
721 continue;
722 }
723 }
724
725 let file_content = match std::fs::read(&file_path) {
726 Ok(c) => c,
727 Err(e) => {
728 let err_msg = format!("IO error: {e}");
729 let _ = queue_conn.execute(
730 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
731 rusqlite::params![err_msg, queue_id],
732 );
733 let current_index = completed + failed + skipped;
734 failed += 1;
735 emit_json(&FileEvent {
736 file: &file_path,
737 name: "",
738 status: "failed",
739 memory_id: None,
740 entities: None,
741 rels: None,
742 cost_usd: None,
743 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
744 error: Some(&err_msg),
745 index: current_index,
746 total,
747 });
748 if args.fail_fast {
749 break;
750 }
751 continue;
752 }
753 };
754
755 let max_extract_attempts: u32 = 2;
757 let mut extraction_result: Option<(ExtractionResult, f64)> = None;
758 let mut last_extract_err: Option<String> = None;
759
760 for attempt in 1..=max_extract_attempts {
761 match extract_with_claude(
762 &claude_binary,
763 &file_content,
764 args.claude_model.as_deref(),
765 args.claude_timeout,
766 ) {
767 Ok(result) => {
768 extraction_result = Some(result);
769 break;
770 }
771 Err(ref e) if format!("{e}").contains("RATE_LIMITED") => {
772 last_extract_err = Some(format!("{e}"));
773 break;
774 }
775 Err(e) => {
776 let msg = format!("{e}");
777 if attempt < max_extract_attempts {
778 tracing::warn!(target: "ingest", attempt, error = %msg, "extraction failed, retrying (cold-start workaround)");
779 std::thread::sleep(std::time::Duration::from_secs(2));
780 }
781 last_extract_err = Some(msg);
782 }
783 }
784 }
785
786 if let Some((extraction, cost)) = extraction_result {
787 backoff_secs = args.rate_limit_wait;
788
789 let (normalized_name, _truncated, _orig) = crate::commands::ingest::derive_kebab_name(
790 std::path::Path::new(&extraction.name),
791 args.max_name_length,
792 );
793 let name = &normalized_name;
794 let ent_count = extraction.entities.len();
795 let rel_count = extraction.relationships.len();
796
797 let new_entities: Vec<NewEntity> = extraction
798 .entities
799 .iter()
800 .filter_map(|e| match e.entity_type.parse::<EntityType>() {
801 Ok(et) => Some(NewEntity {
802 name: e.name.clone(),
803 entity_type: et,
804 description: None,
805 }),
806 Err(_) => {
807 tracing::warn!(
808 target: "ingest",
809 entity = %e.name,
810 entity_type = %e.entity_type,
811 "entity type not recognized, skipping"
812 );
813 None
814 }
815 })
816 .collect();
817
818 let new_relationships: Vec<NewRelationship> = extraction
819 .relationships
820 .iter()
821 .map(|r| NewRelationship {
822 source: r.source.clone(),
823 target: r.target.clone(),
824 relation: r.relation.clone(),
825 strength: r.strength,
826 description: None,
827 })
828 .collect();
829
830 let body_str = String::from_utf8_lossy(&file_content);
831 let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
832 let new_memory = NewMemory {
833 name: name.clone(),
834 namespace: namespace.clone(),
835 memory_type: memory_type_str.clone(),
836 description: extraction.description.clone(),
837 body: body_str.to_string(),
838 body_hash,
839 session_id: None,
840 source: "agent".to_string(),
841 metadata: serde_json::Value::Object(serde_json::Map::new()),
842 };
843
844 let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
846 Some((existing_id, is_deleted)) => {
847 if is_deleted {
848 memories::clear_deleted_at(&conn, existing_id)?;
849 }
850 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
851 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
852 rusqlite::params![existing_id],
853 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
854 )?;
855 memories::update(&conn, existing_id, &new_memory, None)?;
856 memories::sync_fts_after_update(
857 &conn,
858 existing_id,
859 &old_name,
860 &old_desc,
861 &old_body,
862 &new_memory.name,
863 &new_memory.description,
864 &new_memory.body,
865 )?;
866 tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
867 existing_id
868 }
869 None => match memories::insert(&conn, &new_memory) {
870 Ok(id) => id,
871 Err(e) => {
872 let err_msg = format!("{e}");
873 let _ = queue_conn.execute(
874 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
875 rusqlite::params![err_msg, queue_id],
876 );
877 let current_index = completed + failed + skipped;
878 failed += 1;
879 emit_json(&FileEvent {
880 file: &file_path,
881 name,
882 status: "failed",
883 memory_id: None,
884 entities: None,
885 rels: None,
886 cost_usd: Some(cost),
887 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
888 error: Some(&err_msg),
889 index: current_index,
890 total,
891 });
892 cost_total += cost;
893 if args.fail_fast {
894 break;
895 }
896 continue;
897 }
898 },
899 };
900
901 for ent in &new_entities {
902 match entities::upsert_entity(&conn, &namespace, ent) {
903 Ok(eid) => {
904 let _ = entities::link_memory_entity(&conn, memory_id, eid);
905 }
906 Err(e) => {
907 tracing::warn!(
908 target: "ingest",
909 entity = %ent.name,
910 error = %e,
911 "entity skipped due to validation error"
912 );
913 }
914 }
915 }
916 for rel in &new_relationships {
917 crate::parsers::warn_if_non_canonical(&rel.relation);
918 let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
919 let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
920 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
921 let _ = conn.execute(
922 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
923 rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
924 );
925 }
926 }
927
928 let body_text = String::from_utf8_lossy(&file_content).into_owned();
930 let snippet: String = body_text.chars().take(200).collect();
931 let chunks_info =
932 crate::chunking::split_into_chunks_hierarchical(&body_text, tokenizer);
933
934 let embedding_result = if chunks_info.len() <= 1 {
935 crate::daemon::embed_passage_or_local(&paths.models, &body_text)
936 } else {
937 let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
938 let mut multi_ok = true;
939 for chunk in &chunks_info {
940 let chunk_text = crate::chunking::chunk_text(&body_text, chunk);
941 match crate::daemon::embed_passage_or_local(&paths.models, chunk_text) {
942 Ok(emb) => chunk_embeddings.push(emb),
943 Err(e) => {
944 tracing::warn!(
945 target: "ingest",
946 file = %file_path,
947 error = %e,
948 "chunk embedding failed, skipping vector index for this file"
949 );
950 multi_ok = false;
951 break;
952 }
953 }
954 }
955 if multi_ok {
956 let aggregated = crate::chunking::aggregate_embeddings(&chunk_embeddings);
957 if let Err(e) = crate::storage::chunks::insert_chunk_slices(
959 &conn,
960 memory_id,
961 &body_text,
962 &chunks_info,
963 ) {
964 tracing::warn!(
965 target: "ingest",
966 file = %file_path,
967 error = %e,
968 "chunk slice insert failed"
969 );
970 } else {
971 for (i, emb) in chunk_embeddings.iter().enumerate() {
972 if let Err(e) = crate::storage::chunks::upsert_chunk_vec(
973 &conn, i as i64, memory_id, i as i32, emb,
974 ) {
975 tracing::warn!(
976 target: "ingest",
977 file = %file_path,
978 chunk = i,
979 error = %e,
980 "chunk vec upsert failed"
981 );
982 }
983 }
984 }
985 Ok(aggregated)
986 } else {
987 crate::daemon::embed_passage_or_local(&paths.models, &body_text)
989 }
990 };
991
992 match embedding_result {
993 Ok(embedding) => {
994 if let Err(e) = memories::upsert_vec(
995 &conn,
996 memory_id,
997 &namespace,
998 &memory_type_str,
999 &embedding,
1000 name,
1001 &snippet,
1002 ) {
1003 tracing::warn!(
1004 target: "ingest",
1005 file = %file_path,
1006 error = %e,
1007 "memory vec upsert failed; recall may not find this memory"
1008 );
1009 }
1010 for ent in &new_entities {
1012 if let Ok(Some(eid)) =
1013 entities::find_entity_id(&conn, &namespace, &ent.name)
1014 {
1015 let entity_text = ent.name.clone();
1016 match crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
1017 {
1018 Ok(emb) => {
1019 if let Err(e) = entities::upsert_entity_vec(
1020 &conn,
1021 eid,
1022 &namespace,
1023 ent.entity_type,
1024 &emb,
1025 &ent.name,
1026 ) {
1027 tracing::warn!(
1028 target: "ingest",
1029 entity = %ent.name,
1030 error = %e,
1031 "entity vec upsert failed"
1032 );
1033 }
1034 }
1035 Err(e) => {
1036 tracing::warn!(
1037 target: "ingest",
1038 entity = %ent.name,
1039 error = %e,
1040 "entity embedding failed"
1041 );
1042 }
1043 }
1044 }
1045 }
1046 }
1047 Err(e) => {
1048 tracing::warn!(
1049 target: "ingest",
1050 file = %file_path,
1051 error = %e,
1052 "memory embedding failed; recall will not find this memory"
1053 );
1054 }
1055 }
1056
1057 let _ = queue_conn.execute(
1058 "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1059 rusqlite::params![
1060 name,
1061 memory_id,
1062 ent_count,
1063 rel_count,
1064 cost,
1065 file_started.elapsed().as_millis() as i64,
1066 queue_id
1067 ],
1068 );
1069
1070 let current_index = completed + failed + skipped;
1071 completed += 1;
1072 entities_total += ent_count;
1073 rels_total += rel_count;
1074 cost_total += cost;
1075
1076 emit_json(&FileEvent {
1077 file: &file_path,
1078 name,
1079 status: "done",
1080 memory_id: Some(memory_id),
1081 entities: Some(ent_count),
1082 rels: Some(rel_count),
1083 cost_usd: Some(cost),
1084 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1085 error: None,
1086 index: current_index,
1087 total,
1088 });
1089 } else if let Some(ref err_str) = last_extract_err {
1090 if err_str.contains("RATE_LIMITED") {
1091 tracing::warn!(
1092 target: "ingest",
1093 wait_seconds = backoff_secs,
1094 "rate limited, waiting before retry"
1095 );
1096 let _ = queue_conn.execute(
1097 "UPDATE queue SET status='pending' WHERE id=?1",
1098 rusqlite::params![queue_id],
1099 );
1100 std::thread::sleep(std::time::Duration::from_secs(backoff_secs));
1101 backoff_secs = (backoff_secs * 2).min(900);
1102 continue;
1103 } else {
1104 let _ = queue_conn.execute(
1105 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1106 rusqlite::params![err_str, queue_id],
1107 );
1108 let current_index = completed + failed + skipped;
1109 failed += 1;
1110 emit_json(&FileEvent {
1111 file: &file_path,
1112 name: "",
1113 status: "failed",
1114 memory_id: None,
1115 entities: None,
1116 rels: None,
1117 cost_usd: None,
1118 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1119 error: Some(err_str),
1120 index: current_index,
1121 total,
1122 });
1123 if args.fail_fast {
1124 break;
1125 }
1126 }
1127 }
1128
1129 if let Some(budget) = args.max_cost_usd {
1130 if cost_total >= budget {
1131 tracing::warn!(
1132 target: "ingest",
1133 spent = cost_total,
1134 budget = budget,
1135 "budget exceeded, stopping"
1136 );
1137 break;
1138 }
1139 }
1140 }
1141
1142 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1144 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1145
1146 emit_json(&Summary {
1147 summary: true,
1148 files_total: total,
1149 completed,
1150 failed,
1151 skipped,
1152 entities_total,
1153 rels_total,
1154 cost_usd: cost_total,
1155 elapsed_ms: started.elapsed().as_millis() as u64,
1156 });
1157
1158 if !args.keep_queue && failed == 0 {
1159 let _ = std::fs::remove_file(&args.queue_db);
1160 }
1161
1162 Ok(())
1163}
1164
1165#[cfg(test)]
1166mod tests {
1167 use super::*;
1168
1169 #[test]
1170 fn test_extraction_schema_valid_json() {
1171 let _: serde_json::Value =
1172 serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
1173 }
1174
1175 #[test]
1176 fn test_parse_claude_output_valid() {
1177 let output = r#"[
1178 {"type":"system","subtype":"init"},
1179 {"type":"assistant"},
1180 {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
1181 ]"#;
1182 let (result, cost) = parse_claude_output(output).expect("parse must succeed");
1183 assert_eq!(result.name, "test-doc");
1184 assert_eq!(result.entities.len(), 1);
1185 assert_eq!(result.relationships.len(), 1);
1186 assert!((cost - 0.02).abs() < f64::EPSILON);
1187 }
1188
1189 #[test]
1190 fn test_parse_claude_output_error() {
1191 let output = r#"[
1192 {"type":"system","subtype":"init"},
1193 {"type":"result","is_error":true,"error":"authentication failed"}
1194 ]"#;
1195 let err = parse_claude_output(output).unwrap_err();
1196 assert!(format!("{err}").contains("authentication failed"));
1197 }
1198
1199 #[test]
1200 fn test_parse_claude_output_rate_limit() {
1201 let output = r#"[
1202 {"type":"system","subtype":"init"},
1203 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
1204 ]"#;
1205 let err = parse_claude_output(output).unwrap_err();
1206 assert!(format!("{err}").contains("RATE_LIMITED"));
1207 }
1208
1209 #[test]
1210 fn test_parse_claude_output_malformed() {
1211 let output = "not json at all";
1212 assert!(parse_claude_output(output).is_err());
1213 }
1214
1215 #[test]
1216 fn test_find_claude_binary_not_found() {
1217 let original_path = std::env::var_os("PATH");
1218 std::env::set_var("PATH", "/nonexistent");
1219 std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
1220 let result = find_claude_binary(None);
1221 if let Some(p) = original_path {
1222 std::env::set_var("PATH", p);
1223 }
1224 assert!(result.is_err());
1225 }
1226
1227 #[test]
1228 fn test_parse_claude_output_result_fallback() {
1229 let output = r#"[
1230 {"type":"system","subtype":"init"},
1231 {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1232 ]"#;
1233 let (result, cost) = parse_claude_output(output).expect("result fallback must work");
1234 assert_eq!(result.name, "test-fallback");
1235 assert_eq!(result.entities.len(), 1);
1236 assert!(result.relationships.is_empty());
1237 assert!((cost - 0.01).abs() < f64::EPSILON);
1238 }
1239
1240 #[test]
1241 fn test_parse_claude_output_error_with_result_field() {
1242 let output = r#"[
1243 {"type":"system","subtype":"init"},
1244 {"type":"result","is_error":true,"result":"Not logged in ยท Please run /login"}
1245 ]"#;
1246 let err = parse_claude_output(output).unwrap_err();
1247 let msg = format!("{err}");
1248 assert!(
1249 msg.contains("Not logged in"),
1250 "expected 'Not logged in' in: {msg}"
1251 );
1252 }
1253
1254 #[test]
1255 fn test_extraction_schema_entity_types_match_enum() {
1256 let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1257 let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1258 .as_array()
1259 .expect("schema must have entity_type enum");
1260 for t in types {
1261 let s = t.as_str().unwrap();
1262 assert!(
1263 s.parse::<EntityType>().is_ok(),
1264 "schema entity_type '{s}' not in EntityType enum"
1265 );
1266 }
1267 }
1268}