1use crate::commands::ingest::IngestArgs;
13use crate::entity_type::EntityType;
14use crate::errors::AppError;
15use crate::paths::AppPaths;
16use crate::storage::connection::{ensure_db_ready, open_rw};
17use crate::storage::entities::{self, NewEntity, NewRelationship};
18use crate::storage::memories::{self, NewMemory};
19
20use rusqlite::Connection;
21use serde::{Deserialize, Serialize};
22use std::io::Write;
23use std::path::{Path, PathBuf};
24use std::process::{Command, Stdio};
25use std::time::Instant;
26
27const MIN_CLAUDE_VERSION: &str = "2.1.0";
28
29const EXTRACTION_SCHEMA: &str = r#"{
30 "type": "object",
31 "properties": {
32 "name": { "type": "string" },
33 "description": { "type": "string" },
34 "entities": {
35 "type": "array",
36 "items": {
37 "type": "object",
38 "properties": {
39 "name": { "type": "string" },
40 "entity_type": {
41 "type": "string",
42 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
43 }
44 },
45 "required": ["name", "entity_type"],
46 "additionalProperties": false
47 }
48 },
49 "relationships": {
50 "type": "array",
51 "items": {
52 "type": "object",
53 "properties": {
54 "source": { "type": "string" },
55 "target": { "type": "string" },
56 "relation": {
57 "type": "string",
58 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
59 },
60 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
61 },
62 "required": ["source","target","relation","strength"],
63 "additionalProperties": false
64 }
65 }
66 },
67 "required": ["name","description","entities","relationships"],
68 "additionalProperties": false
69}"#;
70
71const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
721. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
732. A one-sentence description (10-20 words) summarizing the key insight\n\
743. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
754. Typed relationships between entities with strength scores\n\n\
76Rules:\n\
77- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
78- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
79- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
80- NEVER use 'mentions' as relationship type\n\
81- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
82- Prefer fewer high-quality entities over many low-quality ones\n\
83- Description must answer: What is this about and WHY does it matter?";
84
85#[derive(Debug, Deserialize)]
86struct ClaudeOutputElement {
87 r#type: Option<String>,
88 subtype: Option<String>,
89 #[serde(default)]
90 is_error: bool,
91 structured_output: Option<ExtractionResult>,
92 result: Option<String>,
93 total_cost_usd: Option<f64>,
94 error: Option<String>,
95 terminal_reason: Option<String>,
96 #[serde(rename = "apiKeySource")]
97 api_key_source: Option<String>,
98}
99
100#[derive(Debug, Clone, Deserialize, Serialize)]
101pub struct ExtractionResult {
102 pub name: String,
103 pub description: String,
104 pub entities: Vec<ExtractedEntity>,
105 pub relationships: Vec<ExtractedRelationship>,
106}
107
108#[derive(Debug, Clone, Deserialize, Serialize)]
109pub struct ExtractedEntity {
110 pub name: String,
111 pub entity_type: String,
112}
113
114#[derive(Debug, Clone, Deserialize, Serialize)]
115pub struct ExtractedRelationship {
116 pub source: String,
117 pub target: String,
118 pub relation: String,
119 pub strength: f64,
120}
121
122#[derive(Debug, Serialize)]
123struct PhaseEvent<'a> {
124 phase: &'a str,
125 #[serde(skip_serializing_if = "Option::is_none")]
126 claude_path: Option<&'a str>,
127 #[serde(skip_serializing_if = "Option::is_none")]
128 version: Option<&'a str>,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 dir: Option<&'a str>,
131 #[serde(skip_serializing_if = "Option::is_none")]
132 files_total: Option<usize>,
133 #[serde(skip_serializing_if = "Option::is_none")]
134 files_new: Option<usize>,
135 #[serde(skip_serializing_if = "Option::is_none")]
136 files_existing: Option<usize>,
137}
138
139#[derive(Debug, Serialize)]
140struct FileEvent<'a> {
141 file: &'a str,
142 name: &'a str,
143 status: &'a str,
144 #[serde(skip_serializing_if = "Option::is_none")]
145 memory_id: Option<i64>,
146 #[serde(skip_serializing_if = "Option::is_none")]
147 entities: Option<usize>,
148 #[serde(skip_serializing_if = "Option::is_none")]
149 rels: Option<usize>,
150 #[serde(skip_serializing_if = "Option::is_none")]
151 cost_usd: Option<f64>,
152 #[serde(skip_serializing_if = "Option::is_none")]
153 elapsed_ms: Option<u64>,
154 #[serde(skip_serializing_if = "Option::is_none")]
155 error: Option<&'a str>,
156 index: usize,
157 total: usize,
158}
159
160#[derive(Debug, Serialize)]
161struct Summary {
162 summary: bool,
163 files_total: usize,
164 completed: usize,
165 failed: usize,
166 skipped: usize,
167 entities_total: usize,
168 rels_total: usize,
169 cost_usd: f64,
170 elapsed_ms: u64,
171}
172
173pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
175 if let Some(p) = explicit {
176 if p.exists() {
177 return Ok(p.to_path_buf());
178 }
179 return Err(AppError::Validation(format!(
180 "Claude Code binary not found at explicit path: {}",
181 p.display()
182 )));
183 }
184
185 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
186 let p = PathBuf::from(&env_path);
187 if p.exists() {
188 return Ok(p);
189 }
190 }
191
192 let name = if cfg!(windows) {
193 "claude.exe"
194 } else {
195 "claude"
196 };
197 if let Some(path_var) = std::env::var_os("PATH") {
198 for dir in std::env::split_paths(&path_var) {
199 let candidate = dir.join(name);
200 if candidate.exists() {
201 return Ok(candidate);
202 }
203 }
204 }
205
206 Err(AppError::Validation(
207 "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
208 ))
209}
210
211fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
213 let output = Command::new(binary)
214 .arg("--version")
215 .stdin(Stdio::null())
216 .stdout(Stdio::piped())
217 .stderr(Stdio::piped())
218 .output()
219 .map_err(AppError::Io)?;
220
221 if !output.status.success() {
222 return Err(AppError::Validation(
223 "failed to run 'claude --version'".to_string(),
224 ));
225 }
226
227 let version_str = String::from_utf8(output.stdout)
228 .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
229 let version = version_str.trim().to_string();
230
231 let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
233
234 fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
235 let parts: Vec<&str> = s.splitn(3, '.').collect();
236 if parts.len() < 2 {
237 return None;
238 }
239 let major = parts[0].parse::<u64>().ok()?;
240 let minor = parts[1].parse::<u64>().ok()?;
241 let patch = parts
242 .get(2)
243 .and_then(|p| p.parse::<u64>().ok())
244 .unwrap_or(0);
245 Some((major, minor, patch))
246 }
247
248 if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
249 if actual < min {
250 return Err(AppError::Validation(format!(
251 "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
252 )));
253 }
254 }
255
256 Ok(version)
257}
258
259fn extract_with_claude(
266 binary: &Path,
267 file_content: &[u8],
268 model: Option<&str>,
269 timeout_secs: u64,
270) -> Result<(ExtractionResult, f64, bool), AppError> {
271 use wait_timeout::ChildExt;
272
273 let mut cmd = Command::new(binary);
274
275 cmd.env_clear();
276 for var in &[
277 "PATH",
278 "HOME",
279 "USER",
280 "SHELL",
281 "TERM",
282 "LANG",
283 "XDG_CONFIG_HOME",
284 "XDG_DATA_HOME",
285 "XDG_RUNTIME_DIR",
286 "ANTHROPIC_API_KEY",
287 "CLAUDE_CONFIG_DIR",
288 "TMPDIR",
289 "TMP",
290 "TEMP",
291 "DYLD_FALLBACK_LIBRARY_PATH",
292 ] {
293 if let Ok(val) = std::env::var(var) {
294 cmd.env(var, val);
295 }
296 }
297
298 #[cfg(windows)]
299 for var in &[
300 "LOCALAPPDATA",
301 "APPDATA",
302 "USERPROFILE",
303 "SystemRoot",
304 "COMSPEC",
305 "PATHEXT",
306 "HOMEPATH",
307 "HOMEDRIVE",
308 ] {
309 if let Ok(val) = std::env::var(var) {
310 cmd.env(var, val);
311 }
312 }
313
314 cmd.arg("-p")
315 .arg(EXTRACTION_PROMPT)
316 .arg("--output-format")
317 .arg("json")
318 .arg("--json-schema")
319 .arg(EXTRACTION_SCHEMA)
320 .arg("--max-turns")
321 .arg("7")
322 .arg("--no-session-persistence");
323
324 if std::env::var("ANTHROPIC_API_KEY").is_ok() {
325 cmd.arg("--bare");
326 } else {
327 cmd.arg("--dangerously-skip-permissions")
328 .arg("--settings")
329 .arg(r#"{"hooks":{}}"#);
330 }
331
332 if let Some(m) = model {
333 cmd.arg("--model").arg(m);
334 }
335
336 cmd.stdin(Stdio::piped())
337 .stdout(Stdio::piped())
338 .stderr(Stdio::piped());
339
340 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
341 AppError::Io(std::io::Error::new(
342 e.kind(),
343 format!("failed to spawn claude: {e}"),
344 ))
345 })?;
346
347 let stdin_data = file_content.to_vec();
348 let mut child_stdin = child
349 .stdin
350 .take()
351 .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
352 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
353 child_stdin.write_all(&stdin_data)?;
354 drop(child_stdin);
355 Ok(())
356 });
357
358 let start = std::time::Instant::now();
359 let timeout = std::time::Duration::from_secs(timeout_secs);
360 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
361
362 match status {
363 Some(exit_status) => {
364 stdin_thread
365 .join()
366 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
367 .map_err(AppError::Io)?;
368
369 tracing::debug!(
370 target: "process",
371 exit_code = ?exit_status.code(),
372 elapsed_ms = start.elapsed().as_millis() as u64,
373 "external process completed"
374 );
375
376 let mut stdout_buf = Vec::new();
377 let mut stderr_buf = Vec::new();
378 if let Some(mut out) = child.stdout.take() {
379 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
380 }
381 if let Some(mut err) = child.stderr.take() {
382 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
383 }
384
385 if !exit_status.success() {
386 let stdout_str = String::from_utf8_lossy(&stdout_buf);
387 if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
388 {
389 if let Some(re) = elements
390 .iter()
391 .find(|e| e.r#type.as_deref() == Some("result"))
392 {
393 if re.terminal_reason.as_deref() == Some("max_turns") {
394 tracing::warn!(
395 target: "ingest",
396 "extraction hit max_turns limit — hooks may have consumed turns"
397 );
398 return Err(AppError::Validation(
399 "claude -p hit max_turns: hooks may be consuming turns".into(),
400 ));
401 }
402 if re.is_error {
403 let err_msg = re
404 .error
405 .as_deref()
406 .or(re.result.as_deref())
407 .unwrap_or("unknown error");
408 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
409 return Err(AppError::RateLimited {
410 detail: err_msg.to_string(),
411 });
412 }
413 if err_msg.contains("Not logged in")
414 || err_msg.contains("authentication")
415 {
416 tracing::warn!(
417 target: "ingest",
418 "Claude Code authentication failed. Re-authenticate interactively with: claude"
419 );
420 }
421 return Err(AppError::Validation(format!(
422 "claude -p failed: {err_msg}"
423 )));
424 }
425 }
426 }
427 let stderr_str = String::from_utf8_lossy(&stderr_buf);
428 if stderr_str.contains("auth") || stderr_str.contains("login") {
429 tracing::warn!(
430 target: "ingest",
431 "Claude Code authentication may have failed. Re-authenticate with: claude"
432 );
433 }
434 return Err(AppError::Validation(format!(
435 "claude -p exited with code {:?}: {}",
436 exit_status.code(),
437 stderr_str.trim()
438 )));
439 }
440
441 let stdout = String::from_utf8(stdout_buf)
442 .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
443 parse_claude_output(&stdout)
444 }
445 None => {
446 tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
447 let _ = child.kill();
448 let _ = child.wait();
449 let _ = stdin_thread.join();
450 Err(AppError::Validation(format!(
451 "claude -p timed out after {timeout_secs} seconds"
452 )))
453 }
454 }
455}
456
457fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64, bool), AppError> {
462 let elements: Vec<ClaudeOutputElement> = serde_json::from_str(stdout).map_err(|e| {
463 AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
464 })?;
465
466 let is_oauth = elements
467 .iter()
468 .find(|e| e.r#type.as_deref() == Some("system") && e.subtype.as_deref() == Some("init"))
469 .and_then(|e| e.api_key_source.as_deref())
470 .map(|s| s == "none")
471 .unwrap_or(false);
472
473 let result_elem = elements
474 .iter()
475 .find(|e| e.r#type.as_deref() == Some("result"))
476 .ok_or_else(|| {
477 AppError::Validation("claude output missing 'result' element".to_string())
478 })?;
479
480 if result_elem.is_error {
481 let err_msg = result_elem
482 .error
483 .as_deref()
484 .or(result_elem.result.as_deref())
485 .unwrap_or("unknown error");
486 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
487 return Err(AppError::RateLimited {
488 detail: err_msg.to_string(),
489 });
490 }
491 return Err(AppError::Validation(format!(
492 "claude extraction failed: {err_msg}"
493 )));
494 }
495
496 let extraction = result_elem
497 .structured_output
498 .clone()
499 .or_else(|| {
500 result_elem
501 .result
502 .as_ref()
503 .and_then(|text| serde_json::from_str::<ExtractionResult>(text).ok())
504 })
505 .ok_or_else(|| {
506 AppError::Validation("claude result missing structured_output and result field".into())
507 })?;
508
509 let cost = result_elem.total_cost_usd.unwrap_or(0.0);
510
511 Ok((extraction, cost, is_oauth))
512}
513
514use crate::output::emit_json_line as emit_json;
515
516fn collect_matching_files(
518 dir: &Path,
519 pattern: &str,
520 recursive: bool,
521 max_files: usize,
522) -> Result<Vec<PathBuf>, AppError> {
523 let mut files = Vec::new();
524 super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
525 files.sort_unstable();
526
527 if files.len() > max_files {
528 return Err(AppError::Validation(format!(
529 "found {} files, exceeds --max-files cap of {}",
530 files.len(),
531 max_files
532 )));
533 }
534
535 Ok(files)
536}
537
538fn open_queue_db(path: &str) -> Result<Connection, AppError> {
540 let conn = Connection::open(path)?;
541
542 conn.pragma_update(None, "journal_mode", "wal")?;
543
544 conn.execute_batch(
545 "CREATE TABLE IF NOT EXISTS queue (
546 id INTEGER PRIMARY KEY AUTOINCREMENT,
547 file_path TEXT NOT NULL UNIQUE,
548 name TEXT,
549 status TEXT NOT NULL DEFAULT 'pending',
550 memory_id INTEGER,
551 entities INTEGER DEFAULT 0,
552 rels INTEGER DEFAULT 0,
553 error TEXT,
554 cost_usd REAL DEFAULT 0.0,
555 attempt INTEGER DEFAULT 0,
556 elapsed_ms INTEGER,
557 created_at TEXT DEFAULT (datetime('now')),
558 done_at TEXT
559 );
560 CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
561 )?;
562
563 Ok(conn)
564}
565
566pub fn run_claude_ingest(args: &IngestArgs) -> Result<(), AppError> {
568 let started = Instant::now();
569
570 if !args.dir.exists() {
571 return Err(AppError::Validation(format!(
572 "directory not found: {}",
573 args.dir.display()
574 )));
575 }
576
577 let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
579 let version = validate_claude_version(&claude_binary)?;
580 tracing::info!(
581 target: "ingest",
582 binary = %claude_binary.display(),
583 version = %version,
584 "Claude Code binary validated"
585 );
586
587 emit_json(&PhaseEvent {
588 phase: "validate",
589 claude_path: claude_binary.to_str(),
590 version: Some(&version),
591 dir: None,
592 files_total: None,
593 files_new: None,
594 files_existing: None,
595 });
596
597 let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
599
600 let queue_conn = open_queue_db(&args.queue_db)?;
601
602 if args.resume {
603 let reset = queue_conn
604 .execute(
605 "UPDATE queue SET status='pending' WHERE status='processing'",
606 [],
607 )
608 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
609 if reset > 0 {
610 tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
611 }
612 }
613
614 if args.retry_failed {
615 let count = queue_conn
616 .execute(
617 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
618 [],
619 )
620 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
621 tracing::info!(target: "ingest", count, "retrying failed files");
622 }
623
624 if !args.resume && !args.retry_failed {
625 queue_conn
626 .execute("DELETE FROM queue", [])
627 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
628 }
629
630 let mut new_count = 0usize;
631 let mut existing_count = 0usize;
632
633 if !args.retry_failed {
634 for file in &files {
635 let file_str = file.to_string_lossy().into_owned();
636 let inserted = queue_conn
637 .execute(
638 "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
639 rusqlite::params![file_str],
640 )
641 .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
642 if inserted > 0 {
643 new_count += 1;
644 } else {
645 existing_count += 1;
646 }
647 }
648 }
649
650 emit_json(&PhaseEvent {
651 phase: "scan",
652 claude_path: None,
653 version: None,
654 dir: args.dir.to_str(),
655 files_total: Some(files.len()),
656 files_new: Some(new_count),
657 files_existing: Some(existing_count),
658 });
659
660 if args.dry_run {
661 for (idx, file) in files.iter().enumerate() {
662 let (name, _truncated, _orig) =
663 super::ingest::derive_kebab_name(file, args.max_name_length);
664 emit_json(&FileEvent {
665 file: &file.to_string_lossy(),
666 name: &name,
667 status: "preview",
668 memory_id: None,
669 entities: None,
670 rels: None,
671 cost_usd: None,
672 elapsed_ms: None,
673 error: None,
674 index: idx,
675 total: files.len(),
676 });
677 }
678 emit_json(&Summary {
679 summary: true,
680 files_total: files.len(),
681 completed: 0,
682 failed: 0,
683 skipped: 0,
684 entities_total: 0,
685 rels_total: 0,
686 cost_usd: 0.0,
687 elapsed_ms: started.elapsed().as_millis() as u64,
688 });
689 if !args.keep_queue {
690 let _ = std::fs::remove_file(&args.queue_db);
691 }
692 return Ok(());
693 }
694
695 let paths = AppPaths::resolve(args.db.as_deref())?;
697 ensure_db_ready(&paths)?;
698 let conn = open_rw(&paths.db)?;
699 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
700 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
701 let memory_type_str = args.r#type.as_str().to_string();
702
703 let mut completed = 0usize;
704 let mut failed = 0usize;
705 let skipped_initial: usize = queue_conn
706 .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
707 r.get::<_, usize>(0)
708 })
709 .unwrap_or(0);
710 let mut skipped = skipped_initial;
711 let mut entities_total = 0usize;
712 let mut rels_total = 0usize;
713 let mut cost_total = 0.0f64;
714 let mut oauth_detected = false;
715 let total = files.len();
716
717 let mut backoff_secs = args.rate_limit_wait;
718 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
719
720 loop {
721 if crate::shutdown_requested() {
722 tracing::info!(target: "ingest", "shutdown requested, stopping before next file");
723 break;
724 }
725
726 let pending: Option<(i64, String)> = queue_conn
727 .query_row(
728 "UPDATE queue SET status='processing', attempt=attempt+1 \
729 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
730 RETURNING id, file_path",
731 [],
732 |row| Ok((row.get(0)?, row.get(1)?)),
733 )
734 .ok();
735
736 let (queue_id, file_path) = match pending {
737 Some(p) => p,
738 None => break,
739 };
740
741 let file_started = Instant::now();
742
743 const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
745 if let Ok(meta) = std::fs::metadata(&file_path) {
746 if meta.len() > MAX_FILE_SIZE {
747 let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
748 let _ = queue_conn.execute(
749 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
750 rusqlite::params![err_msg, queue_id],
751 );
752 let current_index = completed + failed + skipped;
753 failed += 1;
754 emit_json(&FileEvent {
755 file: &file_path,
756 name: "",
757 status: "failed",
758 memory_id: None,
759 entities: None,
760 rels: None,
761 cost_usd: None,
762 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
763 error: Some(&err_msg),
764 index: current_index,
765 total,
766 });
767 if args.fail_fast {
768 break;
769 }
770 continue;
771 }
772 }
773
774 let file_content = match std::fs::read(&file_path) {
775 Ok(c) => c,
776 Err(e) => {
777 let err_msg = format!("IO error: {e}");
778 let _ = queue_conn.execute(
779 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
780 rusqlite::params![err_msg, queue_id],
781 );
782 let current_index = completed + failed + skipped;
783 failed += 1;
784 emit_json(&FileEvent {
785 file: &file_path,
786 name: "",
787 status: "failed",
788 memory_id: None,
789 entities: None,
790 rels: None,
791 cost_usd: None,
792 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
793 error: Some(&err_msg),
794 index: current_index,
795 total,
796 });
797 if args.fail_fast {
798 break;
799 }
800 continue;
801 }
802 };
803
804 if file_content.len() > crate::constants::MAX_MEMORY_BODY_LEN {
806 let err_msg = format!(
807 "file body exceeds {} byte limit ({} bytes) — skipping to avoid wasting LLM tokens",
808 crate::constants::MAX_MEMORY_BODY_LEN,
809 file_content.len()
810 );
811 tracing::warn!(target: "ingest", file = %file_path, size = file_content.len(), "body exceeds limit, skipping LLM extraction");
812 let _ = queue_conn.execute(
813 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
814 rusqlite::params![err_msg, queue_id],
815 );
816 let current_index = completed + failed + skipped;
817 skipped += 1;
818 emit_json(&FileEvent {
819 file: &file_path,
820 name: "",
821 status: "skipped",
822 memory_id: None,
823 entities: None,
824 rels: None,
825 cost_usd: None,
826 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
827 error: Some(&err_msg),
828 index: current_index,
829 total,
830 });
831 continue;
832 }
833
834 let max_extract_attempts: u32 = 2;
836 let mut extraction_result: Option<(ExtractionResult, f64, bool)> = None;
837 let mut last_extract_err: Option<String> = None;
838 let mut last_was_rate_limited = false;
839
840 for attempt in 1..=max_extract_attempts {
841 match extract_with_claude(
842 &claude_binary,
843 &file_content,
844 args.claude_model.as_deref(),
845 args.claude_timeout,
846 ) {
847 Ok(result) => {
848 extraction_result = Some(result);
849 break;
850 }
851 Err(ref e) if matches!(e, AppError::RateLimited { .. }) => {
852 last_extract_err = Some(format!("{e}"));
853 last_was_rate_limited = true;
854 break;
855 }
856 Err(e) => {
857 let msg = format!("{e}");
858 if attempt < max_extract_attempts {
859 let cold_start_delay = 2 * attempt as u64;
860 tracing::warn!(target: "ingest", attempt, delay_secs = cold_start_delay, error = %msg, "extraction failed, retrying (cold-start workaround)");
861 std::thread::sleep(std::time::Duration::from_secs(cold_start_delay));
862 }
863 last_extract_err = Some(msg);
864 }
865 }
866 }
867
868 if let Some((extraction, cost, is_oauth)) = extraction_result {
869 if is_oauth && !oauth_detected {
870 oauth_detected = true;
871 tracing::info!(target: "ingest", "OAuth subscription detected — cost_usd omitted from output");
872 }
873 backoff_secs = args.rate_limit_wait;
874
875 let (normalized_name, _truncated, _orig) = crate::commands::ingest::derive_kebab_name(
876 std::path::Path::new(&extraction.name),
877 args.max_name_length,
878 );
879 let name = &normalized_name;
880 let ent_count = extraction.entities.len();
881 let rel_count = extraction.relationships.len();
882
883 let new_entities: Vec<NewEntity> = extraction
884 .entities
885 .iter()
886 .filter_map(|e| match e.entity_type.parse::<EntityType>() {
887 Ok(et) => Some(NewEntity {
888 name: e.name.clone(),
889 entity_type: et,
890 description: None,
891 }),
892 Err(_) => {
893 tracing::warn!(
894 target: "ingest",
895 entity = %e.name,
896 entity_type = %e.entity_type,
897 "entity type not recognized, skipping"
898 );
899 None
900 }
901 })
902 .collect();
903
904 let new_relationships: Vec<NewRelationship> = extraction
905 .relationships
906 .iter()
907 .map(|r| NewRelationship {
908 source: r.source.clone(),
909 target: r.target.clone(),
910 relation: crate::parsers::normalize_relation(&r.relation),
911 strength: r.strength,
912 description: None,
913 })
914 .collect();
915
916 let body_str = String::from_utf8_lossy(&file_content);
917 let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
918 let new_memory = NewMemory {
919 name: name.clone(),
920 namespace: namespace.clone(),
921 memory_type: memory_type_str.clone(),
922 description: extraction.description.clone(),
923 body: body_str.to_string(),
924 body_hash,
925 session_id: None,
926 source: "agent".to_string(),
927 metadata: serde_json::Value::Object(serde_json::Map::new()),
928 };
929
930 let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
932 Some((existing_id, is_deleted)) => {
933 if is_deleted {
934 memories::clear_deleted_at(&conn, existing_id)?;
935 }
936 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
937 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
938 rusqlite::params![existing_id],
939 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
940 )?;
941 memories::update(&conn, existing_id, &new_memory, None)?;
942 memories::sync_fts_after_update(
943 &conn,
944 existing_id,
945 &old_name,
946 &old_desc,
947 &old_body,
948 &new_memory.name,
949 &new_memory.description,
950 &new_memory.body,
951 )?;
952 tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
953 existing_id
954 }
955 None => match memories::insert(&conn, &new_memory) {
956 Ok(id) => id,
957 Err(e) => {
958 let err_msg = format!("{e}");
959 let _ = queue_conn.execute(
960 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
961 rusqlite::params![err_msg, queue_id],
962 );
963 let current_index = completed + failed + skipped;
964 failed += 1;
965 emit_json(&FileEvent {
966 file: &file_path,
967 name,
968 status: "failed",
969 memory_id: None,
970 entities: None,
971 rels: None,
972 cost_usd: if is_oauth { None } else { Some(cost) },
973 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
974 error: Some(&err_msg),
975 index: current_index,
976 total,
977 });
978 if !is_oauth {
979 cost_total += cost;
980 }
981 if args.fail_fast {
982 break;
983 }
984 continue;
985 }
986 },
987 };
988
989 for ent in &new_entities {
990 match entities::upsert_entity(&conn, &namespace, ent) {
991 Ok(eid) => {
992 let _ = entities::link_memory_entity(&conn, memory_id, eid);
993 }
994 Err(e) => {
995 tracing::warn!(
996 target: "ingest",
997 entity = %ent.name,
998 error = %e,
999 "entity skipped due to validation error"
1000 );
1001 }
1002 }
1003 }
1004 for rel in &new_relationships {
1005 crate::parsers::warn_if_non_canonical(&rel.relation);
1006 let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
1007 let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
1008 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1009 let _ = conn.execute(
1010 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
1011 rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
1012 );
1013 }
1014 }
1015
1016 let body_text = String::from_utf8_lossy(&file_content).into_owned();
1018 let snippet: String = body_text.chars().take(200).collect();
1019 let chunks_info =
1020 crate::chunking::split_into_chunks_hierarchical(&body_text, tokenizer);
1021
1022 let embedding_result = if chunks_info.len() <= 1 {
1023 crate::daemon::embed_passage_or_local(&paths.models, &body_text)
1024 } else {
1025 let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
1026 let mut multi_ok = true;
1027 for chunk in &chunks_info {
1028 let chunk_text = crate::chunking::chunk_text(&body_text, chunk);
1029 match crate::daemon::embed_passage_or_local(&paths.models, chunk_text) {
1030 Ok(emb) => chunk_embeddings.push(emb),
1031 Err(e) => {
1032 tracing::warn!(
1033 target: "ingest",
1034 file = %file_path,
1035 error = %e,
1036 "chunk embedding failed, skipping vector index for this file"
1037 );
1038 multi_ok = false;
1039 break;
1040 }
1041 }
1042 }
1043 if multi_ok {
1044 let aggregated = crate::chunking::aggregate_embeddings(&chunk_embeddings);
1045 if let Err(e) = crate::storage::chunks::insert_chunk_slices(
1047 &conn,
1048 memory_id,
1049 &body_text,
1050 &chunks_info,
1051 ) {
1052 tracing::warn!(
1053 target: "ingest",
1054 file = %file_path,
1055 error = %e,
1056 "chunk slice insert failed"
1057 );
1058 } else {
1059 for (i, emb) in chunk_embeddings.iter().enumerate() {
1060 if let Err(e) = crate::storage::chunks::upsert_chunk_vec(
1061 &conn, i as i64, memory_id, i as i32, emb,
1062 ) {
1063 tracing::warn!(
1064 target: "ingest",
1065 file = %file_path,
1066 chunk = i,
1067 error = %e,
1068 "chunk vec upsert failed"
1069 );
1070 }
1071 }
1072 }
1073 Ok(aggregated)
1074 } else {
1075 crate::daemon::embed_passage_or_local(&paths.models, &body_text)
1077 }
1078 };
1079
1080 match embedding_result {
1081 Ok(embedding) => {
1082 if let Err(e) = memories::upsert_vec(
1083 &conn,
1084 memory_id,
1085 &namespace,
1086 &memory_type_str,
1087 &embedding,
1088 name,
1089 &snippet,
1090 ) {
1091 tracing::warn!(
1092 target: "ingest",
1093 file = %file_path,
1094 error = %e,
1095 "memory vec upsert failed; recall may not find this memory"
1096 );
1097 }
1098 for ent in &new_entities {
1100 if let Ok(Some(eid)) =
1101 entities::find_entity_id(&conn, &namespace, &ent.name)
1102 {
1103 let entity_text = ent.name.clone();
1104 match crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
1105 {
1106 Ok(emb) => {
1107 if let Err(e) = entities::upsert_entity_vec(
1108 &conn,
1109 eid,
1110 &namespace,
1111 ent.entity_type,
1112 &emb,
1113 &ent.name,
1114 ) {
1115 tracing::warn!(
1116 target: "ingest",
1117 entity = %ent.name,
1118 error = %e,
1119 "entity vec upsert failed"
1120 );
1121 }
1122 }
1123 Err(e) => {
1124 tracing::warn!(
1125 target: "ingest",
1126 entity = %ent.name,
1127 error = %e,
1128 "entity embedding failed"
1129 );
1130 }
1131 }
1132 }
1133 }
1134 }
1135 Err(e) => {
1136 tracing::warn!(
1137 target: "ingest",
1138 file = %file_path,
1139 error = %e,
1140 "memory embedding failed; recall will not find this memory"
1141 );
1142 }
1143 }
1144
1145 let _ = queue_conn.execute(
1146 "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1147 rusqlite::params![
1148 name,
1149 memory_id,
1150 ent_count,
1151 rel_count,
1152 cost,
1153 file_started.elapsed().as_millis() as i64,
1154 queue_id
1155 ],
1156 );
1157
1158 let current_index = completed + failed + skipped;
1159 completed += 1;
1160 entities_total += ent_count;
1161 rels_total += rel_count;
1162 if !is_oauth {
1163 cost_total += cost;
1164 }
1165
1166 emit_json(&FileEvent {
1167 file: &file_path,
1168 name,
1169 status: "done",
1170 memory_id: Some(memory_id),
1171 entities: Some(ent_count),
1172 rels: Some(rel_count),
1173 cost_usd: if is_oauth { None } else { Some(cost) },
1174 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1175 error: None,
1176 index: current_index,
1177 total,
1178 });
1179 } else if let Some(ref err_str) = last_extract_err {
1180 if last_was_rate_limited {
1181 if crate::retry::is_kill_switch_active() {
1182 tracing::warn!(target: "ingest", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1183 } else if std::time::Instant::now() >= rate_limit_deadline {
1184 tracing::error!(target: "ingest", "rate-limit retry deadline (1h) exhausted");
1185 } else {
1186 let half = backoff_secs / 2;
1187 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1188 let actual_wait = half + jitter;
1189 tracing::warn!(target: "ingest", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
1190 let _ = queue_conn.execute(
1191 "UPDATE queue SET status='pending' WHERE id=?1",
1192 rusqlite::params![queue_id],
1193 );
1194 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1195 backoff_secs = (backoff_secs * 2).min(900);
1196 continue;
1197 }
1198 } else {
1199 let _ = queue_conn.execute(
1200 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1201 rusqlite::params![err_str, queue_id],
1202 );
1203 let current_index = completed + failed + skipped;
1204 failed += 1;
1205 emit_json(&FileEvent {
1206 file: &file_path,
1207 name: "",
1208 status: "failed",
1209 memory_id: None,
1210 entities: None,
1211 rels: None,
1212 cost_usd: None,
1213 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1214 error: Some(err_str),
1215 index: current_index,
1216 total,
1217 });
1218 if args.fail_fast {
1219 break;
1220 }
1221 }
1222 }
1223
1224 if let Some(budget) = args.max_cost_usd {
1225 if oauth_detected {
1226 tracing::debug!(target: "ingest", "--max-cost-usd ignored: OAuth subscription detected");
1227 } else if cost_total >= budget {
1228 tracing::warn!(
1229 target: "ingest",
1230 spent = cost_total,
1231 budget = budget,
1232 "budget exceeded, stopping"
1233 );
1234 break;
1235 }
1236 }
1237 }
1238
1239 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1241 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1242
1243 emit_json(&Summary {
1244 summary: true,
1245 files_total: total,
1246 completed,
1247 failed,
1248 skipped,
1249 entities_total,
1250 rels_total,
1251 cost_usd: cost_total,
1252 elapsed_ms: started.elapsed().as_millis() as u64,
1253 });
1254
1255 if !args.keep_queue && failed == 0 {
1256 let _ = std::fs::remove_file(&args.queue_db);
1257 }
1258
1259 Ok(())
1260}
1261
1262#[cfg(test)]
1263mod tests {
1264 use super::*;
1265
1266 #[test]
1267 fn test_extraction_schema_valid_json() {
1268 let _: serde_json::Value =
1269 serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
1270 }
1271
1272 #[test]
1273 fn test_parse_claude_output_valid() {
1274 let output = r#"[
1275 {"type":"system","subtype":"init"},
1276 {"type":"assistant"},
1277 {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
1278 ]"#;
1279 let (result, cost, _is_oauth) = parse_claude_output(output).expect("parse must succeed");
1280 assert_eq!(result.name, "test-doc");
1281 assert_eq!(result.entities.len(), 1);
1282 assert_eq!(result.relationships.len(), 1);
1283 assert!((cost - 0.02).abs() < f64::EPSILON);
1284 }
1285
1286 #[test]
1287 fn test_parse_claude_output_error() {
1288 let output = r#"[
1289 {"type":"system","subtype":"init"},
1290 {"type":"result","is_error":true,"error":"authentication failed"}
1291 ]"#;
1292 let err = parse_claude_output(output).unwrap_err();
1293 assert!(format!("{err}").contains("authentication failed"));
1294 }
1295
1296 #[test]
1297 fn test_parse_claude_output_rate_limit() {
1298 let output = r#"[
1299 {"type":"system","subtype":"init"},
1300 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
1301 ]"#;
1302 let err = parse_claude_output(output).unwrap_err();
1303 assert!(matches!(err, AppError::RateLimited { .. }));
1304 }
1305
1306 #[test]
1307 fn test_parse_claude_output_malformed() {
1308 let output = "not json at all";
1309 assert!(parse_claude_output(output).is_err());
1310 }
1311
1312 #[test]
1313 fn test_find_claude_binary_not_found() {
1314 let original_path = std::env::var_os("PATH");
1315 std::env::set_var("PATH", "/nonexistent");
1316 std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
1317 let result = find_claude_binary(None);
1318 if let Some(p) = original_path {
1319 std::env::set_var("PATH", p);
1320 }
1321 assert!(result.is_err());
1322 }
1323
1324 #[test]
1325 fn test_parse_claude_output_result_fallback() {
1326 let output = r#"[
1327 {"type":"system","subtype":"init"},
1328 {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1329 ]"#;
1330 let (result, cost, _is_oauth) =
1331 parse_claude_output(output).expect("result fallback must work");
1332 assert_eq!(result.name, "test-fallback");
1333 assert_eq!(result.entities.len(), 1);
1334 assert!(result.relationships.is_empty());
1335 assert!((cost - 0.01).abs() < f64::EPSILON);
1336 }
1337
1338 #[test]
1339 fn test_parse_claude_output_error_with_result_field() {
1340 let output = r#"[
1341 {"type":"system","subtype":"init"},
1342 {"type":"result","is_error":true,"result":"Not logged in · Please run /login"}
1343 ]"#;
1344 let err = parse_claude_output(output).unwrap_err();
1345 let msg = format!("{err}");
1346 assert!(
1347 msg.contains("Not logged in"),
1348 "expected 'Not logged in' in: {msg}"
1349 );
1350 }
1351
1352 #[test]
1353 fn test_terminal_reason_max_turns_detected() {
1354 let output = r#"[
1355 {"type":"system","subtype":"init"},
1356 {"type":"result","is_error":false,"terminal_reason":"max_turns","structured_output":{"name":"t","description":"d","entities":[],"relationships":[]}}
1357 ]"#;
1358 let err_or_ok = parse_claude_output(output);
1359 assert!(
1360 err_or_ok.is_ok(),
1361 "max_turns in result without is_error should still parse"
1362 );
1363 }
1364
1365 #[test]
1366 fn test_detect_oauth_from_init_json() {
1367 let output = r#"[
1368 {"type":"system","subtype":"init","apiKeySource":"none"},
1369 {"type":"result","is_error":false,"total_cost_usd":0.50,"structured_output":{"name":"test-oauth","description":"oauth test","entities":[],"relationships":[]}}
1370 ]"#;
1371 let (_result, cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1372 assert!(is_oauth, "apiKeySource=none must be detected as OAuth");
1373 assert!((cost - 0.50).abs() < f64::EPSILON);
1374 }
1375
1376 #[test]
1377 fn test_api_key_source_not_oauth() {
1378 let output = r#"[
1379 {"type":"system","subtype":"init","apiKeySource":"env"},
1380 {"type":"result","is_error":false,"total_cost_usd":0.10,"structured_output":{"name":"test-api","description":"api test","entities":[],"relationships":[]}}
1381 ]"#;
1382 let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1383 assert!(!is_oauth, "apiKeySource=env must NOT be detected as OAuth");
1384 }
1385
1386 #[test]
1387 fn test_missing_api_key_source_defaults_not_oauth() {
1388 let output = r#"[
1389 {"type":"system","subtype":"init"},
1390 {"type":"result","is_error":false,"total_cost_usd":0.05,"structured_output":{"name":"test-missing","description":"missing test","entities":[],"relationships":[]}}
1391 ]"#;
1392 let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1393 assert!(!is_oauth, "missing apiKeySource must default to not OAuth");
1394 }
1395
1396 #[test]
1397 fn test_extraction_schema_entity_types_match_enum() {
1398 let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1399 let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1400 .as_array()
1401 .expect("schema must have entity_type enum");
1402 for t in types {
1403 let s = t.as_str().unwrap();
1404 assert!(
1405 s.parse::<EntityType>().is_ok(),
1406 "schema entity_type '{s}' not in EntityType enum"
1407 );
1408 }
1409 }
1410}