1use crate::commands::ingest::IngestArgs;
13use crate::entity_type::EntityType;
14use crate::errors::AppError;
15use crate::paths::AppPaths;
16use crate::storage::connection::{ensure_db_ready, open_rw};
17use crate::storage::entities::{self, NewEntity, NewRelationship};
18use crate::storage::memories::{self, NewMemory};
19
20use rusqlite::Connection;
21use serde::{Deserialize, Serialize};
22use std::io::Write;
23use std::path::{Path, PathBuf};
24use std::process::{Command, Stdio};
25use std::time::Instant;
26
27const MIN_CLAUDE_VERSION: &str = "2.1.0";
28
29const EXTRACTION_SCHEMA: &str = r#"{
30 "type": "object",
31 "properties": {
32 "name": { "type": "string" },
33 "description": { "type": "string" },
34 "entities": {
35 "type": "array",
36 "items": {
37 "type": "object",
38 "properties": {
39 "name": { "type": "string" },
40 "entity_type": {
41 "type": "string",
42 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
43 }
44 },
45 "required": ["name", "entity_type"],
46 "additionalProperties": false
47 }
48 },
49 "relationships": {
50 "type": "array",
51 "items": {
52 "type": "object",
53 "properties": {
54 "source": { "type": "string" },
55 "target": { "type": "string" },
56 "relation": {
57 "type": "string",
58 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
59 },
60 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
61 },
62 "required": ["source","target","relation","strength"],
63 "additionalProperties": false
64 }
65 }
66 },
67 "required": ["name","description","entities","relationships"],
68 "additionalProperties": false
69}"#;
70
71const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
721. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
732. A one-sentence description (10-20 words) summarizing the key insight\n\
743. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
754. Typed relationships between entities with strength scores\n\n\
76Rules:\n\
77- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
78- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
79- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
80- NEVER use 'mentions' as relationship type\n\
81- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
82- Prefer fewer high-quality entities over many low-quality ones\n\
83- Description must answer: What is this about and WHY does it matter?";
84
85#[derive(Debug, Deserialize)]
86struct ClaudeOutputElement {
87 r#type: Option<String>,
88 subtype: Option<String>,
89 #[serde(default)]
90 is_error: bool,
91 structured_output: Option<ExtractionResult>,
92 result: Option<String>,
93 total_cost_usd: Option<f64>,
94 error: Option<String>,
95 terminal_reason: Option<String>,
96 #[serde(rename = "apiKeySource")]
97 api_key_source: Option<String>,
98}
99
100#[derive(Debug, Clone, Deserialize, Serialize)]
101pub struct ExtractionResult {
102 pub name: String,
103 pub description: String,
104 pub entities: Vec<ExtractedEntity>,
105 pub relationships: Vec<ExtractedRelationship>,
106}
107
108#[derive(Debug, Clone, Deserialize, Serialize)]
109pub struct ExtractedEntity {
110 pub name: String,
111 pub entity_type: String,
112}
113
114#[derive(Debug, Clone, Deserialize, Serialize)]
115pub struct ExtractedRelationship {
116 pub source: String,
117 pub target: String,
118 pub relation: String,
119 pub strength: f64,
120}
121
122#[derive(Debug, Serialize)]
123struct PhaseEvent<'a> {
124 phase: &'a str,
125 #[serde(skip_serializing_if = "Option::is_none")]
126 claude_path: Option<&'a str>,
127 #[serde(skip_serializing_if = "Option::is_none")]
128 version: Option<&'a str>,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 dir: Option<&'a str>,
131 #[serde(skip_serializing_if = "Option::is_none")]
132 files_total: Option<usize>,
133 #[serde(skip_serializing_if = "Option::is_none")]
134 files_new: Option<usize>,
135 #[serde(skip_serializing_if = "Option::is_none")]
136 files_existing: Option<usize>,
137}
138
139#[derive(Debug, Serialize)]
140struct FileEvent<'a> {
141 file: &'a str,
142 name: &'a str,
143 status: &'a str,
144 #[serde(skip_serializing_if = "Option::is_none")]
145 memory_id: Option<i64>,
146 #[serde(skip_serializing_if = "Option::is_none")]
147 entities: Option<usize>,
148 #[serde(skip_serializing_if = "Option::is_none")]
149 rels: Option<usize>,
150 #[serde(skip_serializing_if = "Option::is_none")]
151 cost_usd: Option<f64>,
152 #[serde(skip_serializing_if = "Option::is_none")]
153 elapsed_ms: Option<u64>,
154 #[serde(skip_serializing_if = "Option::is_none")]
155 error: Option<&'a str>,
156 index: usize,
157 total: usize,
158}
159
160#[derive(Debug, Serialize)]
161struct Summary {
162 summary: bool,
163 files_total: usize,
164 completed: usize,
165 failed: usize,
166 skipped: usize,
167 entities_total: usize,
168 rels_total: usize,
169 cost_usd: f64,
170 elapsed_ms: u64,
171}
172
173pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
175 if let Some(p) = explicit {
176 if p.exists() {
177 return Ok(p.to_path_buf());
178 }
179 return Err(AppError::Validation(format!(
180 "Claude Code binary not found at explicit path: {}",
181 p.display()
182 )));
183 }
184
185 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
186 let p = PathBuf::from(&env_path);
187 if p.exists() {
188 return Ok(p);
189 }
190 }
191
192 let name = if cfg!(windows) {
193 "claude.exe"
194 } else {
195 "claude"
196 };
197 if let Some(path_var) = std::env::var_os("PATH") {
198 for dir in std::env::split_paths(&path_var) {
199 let candidate = dir.join(name);
200 if candidate.exists() {
201 return Ok(candidate);
202 }
203 }
204 }
205
206 Err(AppError::Validation(
207 "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
208 ))
209}
210
211fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
213 let output = Command::new(binary)
214 .arg("--version")
215 .stdin(Stdio::null())
216 .stdout(Stdio::piped())
217 .stderr(Stdio::piped())
218 .output()
219 .map_err(AppError::Io)?;
220
221 if !output.status.success() {
222 return Err(AppError::Validation(
223 "failed to run 'claude --version'".to_string(),
224 ));
225 }
226
227 let version_str = String::from_utf8(output.stdout)
228 .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
229 let version = version_str.trim().to_string();
230
231 let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
233
234 fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
235 let parts: Vec<&str> = s.splitn(3, '.').collect();
236 if parts.len() < 2 {
237 return None;
238 }
239 let major = parts[0].parse::<u64>().ok()?;
240 let minor = parts[1].parse::<u64>().ok()?;
241 let patch = parts
242 .get(2)
243 .and_then(|p| p.parse::<u64>().ok())
244 .unwrap_or(0);
245 Some((major, minor, patch))
246 }
247
248 if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
249 if actual < min {
250 return Err(AppError::Validation(format!(
251 "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
252 )));
253 }
254 }
255
256 Ok(version)
257}
258
259fn extract_with_claude(
272 binary: &Path,
273 file_content: &[u8],
274 model: Option<&str>,
275 timeout_secs: u64,
276) -> Result<(ExtractionResult, f64, bool), AppError> {
277 use wait_timeout::ChildExt;
278
279 if let Ok(_key) = std::env::var("ANTHROPIC_API_KEY") {
283 let mut cmd = Command::new("false");
284 cmd.env_clear();
285 cmd.env("PATH", "/nonexistent");
286 cmd.arg("--oauth-only-violation-anthropic-api-key-set");
287 return Err(AppError::Validation(
288 "ANTHROPIC_API_KEY is set in the environment; \
289 sqlite-graphrag operates exclusively with OAuth (Pro/Max) and \
290 the API-key path is PROHIBITED (gaps.md:47). Unset the variable \
291 and re-run with `claude login` already completed in this session."
292 .to_string(),
293 ));
294 }
295
296 let mut cmd = Command::new(binary);
297
298 cmd.env_clear();
299 for var in &[
300 "PATH",
301 "HOME",
302 "USER",
303 "SHELL",
304 "TERM",
305 "LANG",
306 "XDG_CONFIG_HOME",
307 "XDG_DATA_HOME",
308 "XDG_RUNTIME_DIR",
309 "CLAUDE_CONFIG_DIR",
311 "TMPDIR",
312 "TMP",
313 "TEMP",
314 "DYLD_FALLBACK_LIBRARY_PATH",
315 ] {
316 if let Ok(val) = std::env::var(var) {
317 cmd.env(var, val);
318 }
319 }
320
321 #[cfg(windows)]
322 for var in &[
323 "LOCALAPPDATA",
324 "APPDATA",
325 "USERPROFILE",
326 "SystemRoot",
327 "COMSPEC",
328 "PATHEXT",
329 "HOMEPATH",
330 "HOMEDRIVE",
331 ] {
332 if let Ok(val) = std::env::var(var) {
333 cmd.env(var, val);
334 }
335 }
336
337 cmd.arg("-p")
340 .arg(EXTRACTION_PROMPT)
341 .arg("--strict-mcp-config")
342 .arg("--mcp-config")
343 .arg("{}")
344 .arg("--dangerously-skip-permissions")
345 .arg("--settings")
346 .arg(r#"{"hooks":{}}"#)
347 .arg("--output-format")
348 .arg("json")
349 .arg("--json-schema")
350 .arg(EXTRACTION_SCHEMA)
351 .arg("--max-turns")
352 .arg("7")
353 .arg("--no-session-persistence");
354
355 if let Some(m) = model {
356 cmd.arg("--model").arg(m);
357 }
358
359 cmd.stdin(Stdio::piped())
360 .stdout(Stdio::piped())
361 .stderr(Stdio::piped());
362
363 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
364 AppError::Io(std::io::Error::new(
365 e.kind(),
366 format!("failed to spawn claude: {e}"),
367 ))
368 })?;
369
370 let stdin_data = file_content.to_vec();
371 let mut child_stdin = child
372 .stdin
373 .take()
374 .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
375 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
376 child_stdin.write_all(&stdin_data)?;
377 drop(child_stdin);
378 Ok(())
379 });
380
381 let start = std::time::Instant::now();
382 let timeout = std::time::Duration::from_secs(timeout_secs);
383 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
384
385 match status {
386 Some(exit_status) => {
387 stdin_thread
388 .join()
389 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
390 .map_err(AppError::Io)?;
391
392 tracing::debug!(
393 target: "process",
394 exit_code = ?exit_status.code(),
395 elapsed_ms = start.elapsed().as_millis() as u64,
396 "external process completed"
397 );
398
399 let mut stdout_buf = Vec::new();
400 let mut stderr_buf = Vec::new();
401 if let Some(mut out) = child.stdout.take() {
402 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
403 }
404 if let Some(mut err) = child.stderr.take() {
405 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
406 }
407
408 if !exit_status.success() {
409 let stdout_str = String::from_utf8_lossy(&stdout_buf);
410 if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
411 {
412 if let Some(re) = elements
413 .iter()
414 .find(|e| e.r#type.as_deref() == Some("result"))
415 {
416 if re.terminal_reason.as_deref() == Some("max_turns") {
417 tracing::warn!(
418 target: "ingest",
419 "extraction hit max_turns limit — hooks may have consumed turns"
420 );
421 return Err(AppError::Validation(
422 "claude -p hit max_turns: hooks may be consuming turns".into(),
423 ));
424 }
425 if re.is_error {
426 let err_msg = re
427 .error
428 .as_deref()
429 .or(re.result.as_deref())
430 .unwrap_or("unknown error");
431 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
432 return Err(AppError::RateLimited {
433 detail: err_msg.to_string(),
434 });
435 }
436 if err_msg.contains("Not logged in")
437 || err_msg.contains("authentication")
438 {
439 tracing::warn!(
440 target: "ingest",
441 "Claude Code authentication failed. Re-authenticate interactively with: claude"
442 );
443 }
444 return Err(AppError::Validation(format!(
445 "claude -p failed: {err_msg}"
446 )));
447 }
448 }
449 }
450 let stderr_str = String::from_utf8_lossy(&stderr_buf);
451 if stderr_str.contains("auth") || stderr_str.contains("login") {
452 tracing::warn!(
453 target: "ingest",
454 "Claude Code authentication may have failed. Re-authenticate with: claude"
455 );
456 }
457 return Err(AppError::Validation(format!(
458 "claude -p exited with code {:?}: {}",
459 exit_status.code(),
460 stderr_str.trim()
461 )));
462 }
463
464 let stdout = String::from_utf8(stdout_buf)
465 .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
466 parse_claude_output(&stdout)
467 }
468 None => {
469 tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
470 let _ = child.kill();
471 let _ = child.wait();
472 let _ = stdin_thread.join();
473 Err(AppError::Validation(format!(
474 "claude -p timed out after {timeout_secs} seconds"
475 )))
476 }
477 }
478}
479
480fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64, bool), AppError> {
485 let elements: Vec<ClaudeOutputElement> = serde_json::from_str(stdout).map_err(|e| {
486 AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
487 })?;
488
489 let is_oauth = elements
490 .iter()
491 .find(|e| e.r#type.as_deref() == Some("system") && e.subtype.as_deref() == Some("init"))
492 .and_then(|e| e.api_key_source.as_deref())
493 .map(|s| s == "none")
494 .unwrap_or(false);
495
496 let result_elem = elements
497 .iter()
498 .find(|e| e.r#type.as_deref() == Some("result"))
499 .ok_or_else(|| {
500 AppError::Validation("claude output missing 'result' element".to_string())
501 })?;
502
503 if result_elem.is_error {
504 let err_msg = result_elem
505 .error
506 .as_deref()
507 .or(result_elem.result.as_deref())
508 .unwrap_or("unknown error");
509 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
510 return Err(AppError::RateLimited {
511 detail: err_msg.to_string(),
512 });
513 }
514 return Err(AppError::Validation(format!(
515 "claude extraction failed: {err_msg}"
516 )));
517 }
518
519 let extraction = result_elem
520 .structured_output
521 .clone()
522 .or_else(|| {
523 result_elem
524 .result
525 .as_ref()
526 .and_then(|text| serde_json::from_str::<ExtractionResult>(text).ok())
527 })
528 .ok_or_else(|| {
529 AppError::Validation("claude result missing structured_output and result field".into())
530 })?;
531
532 let cost = result_elem.total_cost_usd.unwrap_or(0.0);
533
534 Ok((extraction, cost, is_oauth))
535}
536
537use crate::output::emit_json_line as emit_json;
538
539fn collect_matching_files(
541 dir: &Path,
542 pattern: &str,
543 recursive: bool,
544 max_files: usize,
545) -> Result<Vec<PathBuf>, AppError> {
546 let mut files = Vec::new();
547 super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
548 files.sort_unstable();
549
550 if files.len() > max_files {
551 return Err(AppError::Validation(format!(
552 "found {} files, exceeds --max-files cap of {}",
553 files.len(),
554 max_files
555 )));
556 }
557
558 Ok(files)
559}
560
561fn open_queue_db(path: &str) -> Result<Connection, AppError> {
563 let conn = Connection::open(path)?;
564
565 conn.pragma_update(None, "journal_mode", "wal")?;
566
567 conn.execute_batch(
568 "CREATE TABLE IF NOT EXISTS queue (
569 id INTEGER PRIMARY KEY AUTOINCREMENT,
570 file_path TEXT NOT NULL UNIQUE,
571 name TEXT,
572 status TEXT NOT NULL DEFAULT 'pending',
573 memory_id INTEGER,
574 entities INTEGER DEFAULT 0,
575 rels INTEGER DEFAULT 0,
576 error TEXT,
577 cost_usd REAL DEFAULT 0.0,
578 attempt INTEGER DEFAULT 0,
579 elapsed_ms INTEGER,
580 created_at TEXT DEFAULT (datetime('now')),
581 done_at TEXT
582 );
583 CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
584 )?;
585
586 Ok(conn)
587}
588
589pub fn run_claude_ingest(args: &IngestArgs) -> Result<(), AppError> {
591 let started = Instant::now();
592
593 if !args.dir.exists() {
594 return Err(AppError::Validation(format!(
595 "directory not found: {}",
596 args.dir.display()
597 )));
598 }
599
600 let early_ns = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
605 let early_paths = AppPaths::resolve(args.db.as_deref())?;
606 let _singleton = crate::lock::acquire_job_singleton(
607 crate::lock::JobType::IngestClaudeCode,
608 &early_ns,
609 &early_paths.db,
610 args.wait_job_singleton,
611 args.force_job_singleton,
612 )?;
613
614 let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
616 let version = validate_claude_version(&claude_binary)?;
617 tracing::info!(
618 target: "ingest",
619 binary = %claude_binary.display(),
620 version = %version,
621 "Claude Code binary validated"
622 );
623
624 emit_json(&PhaseEvent {
625 phase: "validate",
626 claude_path: claude_binary.to_str(),
627 version: Some(&version),
628 dir: None,
629 files_total: None,
630 files_new: None,
631 files_existing: None,
632 });
633
634 let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
636
637 let queue_conn = open_queue_db(&args.queue_db)?;
638
639 if args.resume {
640 let reset = queue_conn
641 .execute(
642 "UPDATE queue SET status='pending' WHERE status='processing'",
643 [],
644 )
645 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
646 if reset > 0 {
647 tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
648 }
649 }
650
651 if args.retry_failed {
652 let count = queue_conn
653 .execute(
654 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
655 [],
656 )
657 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
658 tracing::info!(target: "ingest", count, "retrying failed files");
659 }
660
661 if !args.resume && !args.retry_failed {
662 queue_conn
663 .execute("DELETE FROM queue", [])
664 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
665 }
666
667 let mut new_count = 0usize;
668 let mut existing_count = 0usize;
669
670 if !args.retry_failed {
671 for file in &files {
672 let file_str = file.to_string_lossy().into_owned();
673 let inserted = queue_conn
674 .execute(
675 "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
676 rusqlite::params![file_str],
677 )
678 .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
679 if inserted > 0 {
680 new_count += 1;
681 } else {
682 existing_count += 1;
683 }
684 }
685 }
686
687 emit_json(&PhaseEvent {
688 phase: "scan",
689 claude_path: None,
690 version: None,
691 dir: args.dir.to_str(),
692 files_total: Some(files.len()),
693 files_new: Some(new_count),
694 files_existing: Some(existing_count),
695 });
696
697 if args.dry_run {
698 for (idx, file) in files.iter().enumerate() {
699 let (name, _truncated, _orig) =
700 super::ingest::derive_kebab_name(file, args.max_name_length);
701 emit_json(&FileEvent {
702 file: &file.to_string_lossy(),
703 name: &name,
704 status: "preview",
705 memory_id: None,
706 entities: None,
707 rels: None,
708 cost_usd: None,
709 elapsed_ms: None,
710 error: None,
711 index: idx,
712 total: files.len(),
713 });
714 }
715 emit_json(&Summary {
716 summary: true,
717 files_total: files.len(),
718 completed: 0,
719 failed: 0,
720 skipped: 0,
721 entities_total: 0,
722 rels_total: 0,
723 cost_usd: 0.0,
724 elapsed_ms: started.elapsed().as_millis() as u64,
725 });
726 if !args.keep_queue {
727 let _ = std::fs::remove_file(&args.queue_db);
728 }
729 return Ok(());
730 }
731
732 let paths = AppPaths::resolve(args.db.as_deref())?;
734 ensure_db_ready(&paths)?;
735 let conn = open_rw(&paths.db)?;
736
737 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
738 let memory_type_str = args.r#type.as_str().to_string();
739
740 let mut completed = 0usize;
741 let mut failed = 0usize;
742 let skipped_initial: usize = queue_conn
743 .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
744 r.get::<_, usize>(0)
745 })
746 .unwrap_or(0);
747 let mut skipped = skipped_initial;
748 let mut entities_total = 0usize;
749 let mut rels_total = 0usize;
750 let mut cost_total = 0.0f64;
751 let mut oauth_detected = false;
752 let total = files.len();
753
754 let mut backoff_secs = args.rate_limit_wait;
755 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
756
757 loop {
758 if crate::shutdown_requested() {
759 tracing::info!(target: "ingest", "shutdown requested, stopping before next file");
760 break;
761 }
762
763 let pending: Option<(i64, String)> = queue_conn
764 .query_row(
765 "UPDATE queue SET status='processing', attempt=attempt+1 \
766 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
767 RETURNING id, file_path",
768 [],
769 |row| Ok((row.get(0)?, row.get(1)?)),
770 )
771 .ok();
772
773 let (queue_id, file_path) = match pending {
774 Some(p) => p,
775 None => break,
776 };
777
778 let file_started = Instant::now();
779
780 const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
782 if let Ok(meta) = std::fs::metadata(&file_path) {
783 if meta.len() > MAX_FILE_SIZE {
784 let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
785 let _ = queue_conn.execute(
786 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
787 rusqlite::params![err_msg, queue_id],
788 );
789 let current_index = completed + failed + skipped;
790 failed += 1;
791 emit_json(&FileEvent {
792 file: &file_path,
793 name: "",
794 status: "failed",
795 memory_id: None,
796 entities: None,
797 rels: None,
798 cost_usd: None,
799 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
800 error: Some(&err_msg),
801 index: current_index,
802 total,
803 });
804 if args.fail_fast {
805 break;
806 }
807 continue;
808 }
809 }
810
811 let file_content = match std::fs::read(&file_path) {
812 Ok(c) => c,
813 Err(e) => {
814 let err_msg = format!("IO error: {e}");
815 let _ = queue_conn.execute(
816 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
817 rusqlite::params![err_msg, queue_id],
818 );
819 let current_index = completed + failed + skipped;
820 failed += 1;
821 emit_json(&FileEvent {
822 file: &file_path,
823 name: "",
824 status: "failed",
825 memory_id: None,
826 entities: None,
827 rels: None,
828 cost_usd: None,
829 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
830 error: Some(&err_msg),
831 index: current_index,
832 total,
833 });
834 if args.fail_fast {
835 break;
836 }
837 continue;
838 }
839 };
840
841 if file_content.len() > crate::constants::MAX_MEMORY_BODY_LEN {
843 let err_msg = format!(
844 "file body exceeds {} byte limit ({} bytes) — skipping to avoid wasting LLM tokens",
845 crate::constants::MAX_MEMORY_BODY_LEN,
846 file_content.len()
847 );
848 tracing::warn!(target: "ingest", file = %file_path, size = file_content.len(), "body exceeds limit, skipping LLM extraction");
849 let _ = queue_conn.execute(
850 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
851 rusqlite::params![err_msg, queue_id],
852 );
853 let current_index = completed + failed + skipped;
854 skipped += 1;
855 emit_json(&FileEvent {
856 file: &file_path,
857 name: "",
858 status: "skipped",
859 memory_id: None,
860 entities: None,
861 rels: None,
862 cost_usd: None,
863 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
864 error: Some(&err_msg),
865 index: current_index,
866 total,
867 });
868 continue;
869 }
870
871 let max_extract_attempts: u32 = 2;
873 let mut extraction_result: Option<(ExtractionResult, f64, bool)> = None;
874 let mut last_extract_err: Option<String> = None;
875 let mut last_was_rate_limited = false;
876
877 for attempt in 1..=max_extract_attempts {
878 match extract_with_claude(
879 &claude_binary,
880 &file_content,
881 args.claude_model.as_deref(),
882 args.claude_timeout,
883 ) {
884 Ok(result) => {
885 extraction_result = Some(result);
886 break;
887 }
888 Err(ref e) if matches!(e, AppError::RateLimited { .. }) => {
889 last_extract_err = Some(format!("{e}"));
890 last_was_rate_limited = true;
891 break;
892 }
893 Err(e) => {
894 let msg = format!("{e}");
895 if attempt < max_extract_attempts {
896 let cold_start_delay = 2 * attempt as u64;
897 tracing::warn!(target: "ingest", attempt, delay_secs = cold_start_delay, error = %msg, "extraction failed, retrying (cold-start workaround)");
898 std::thread::sleep(std::time::Duration::from_secs(cold_start_delay));
899 }
900 last_extract_err = Some(msg);
901 }
902 }
903 }
904
905 if let Some((extraction, cost, is_oauth)) = extraction_result {
906 if is_oauth && !oauth_detected {
907 oauth_detected = true;
908 tracing::info!(target: "ingest", "OAuth subscription detected — cost_usd omitted from output");
909 }
910 backoff_secs = args.rate_limit_wait;
911
912 let (normalized_name, _truncated, _orig) = crate::commands::ingest::derive_kebab_name(
913 std::path::Path::new(&extraction.name),
914 args.max_name_length,
915 );
916 let name = &normalized_name;
917 let ent_count = extraction.entities.len();
918 let rel_count = 0;
919
920 let new_entities: Vec<NewEntity> = extraction
921 .entities
922 .iter()
923 .filter_map(|e| match e.entity_type.parse::<EntityType>() {
924 Ok(et) => Some(NewEntity {
925 name: e.name.clone(),
926 entity_type: et,
927 description: None,
928 }),
929 Err(_) => {
930 tracing::warn!(
931 target: "ingest",
932 entity = %e.name,
933 entity_type = %e.entity_type,
934 "entity type not recognized, skipping"
935 );
936 None
937 }
938 })
939 .collect();
940
941 let new_relationships: Vec<NewRelationship> = extraction
942 .relationships
943 .iter()
944 .map(|r| NewRelationship {
945 source: r.source.clone(),
946 target: r.target.clone(),
947 relation: crate::parsers::normalize_relation(&r.relation),
948 strength: r.strength,
949 description: None,
950 })
951 .collect();
952
953 let body_str = String::from_utf8(file_content.clone())
954 .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
955 let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
956 let new_memory = NewMemory {
957 name: name.clone(),
958 namespace: namespace.clone(),
959 memory_type: memory_type_str.clone(),
960 description: extraction.description.clone(),
961 body: body_str.to_string(),
962 body_hash,
963 session_id: None,
964 source: "agent".to_string(),
965 metadata: serde_json::Value::Object(serde_json::Map::new()),
966 };
967
968 let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
970 Some((existing_id, is_deleted)) => {
971 if is_deleted {
972 memories::clear_deleted_at(&conn, existing_id)?;
973 }
974 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
975 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
976 rusqlite::params![existing_id],
977 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
978 )?;
979 memories::update(&conn, existing_id, &new_memory, None)?;
980 memories::sync_fts_after_update(
981 &conn,
982 existing_id,
983 &old_name,
984 &old_desc,
985 &old_body,
986 &new_memory.name,
987 &new_memory.description,
988 &new_memory.body,
989 )?;
990 tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
991 existing_id
992 }
993 None => match memories::insert(&conn, &new_memory) {
994 Ok(id) => id,
995 Err(e) => {
996 let err_msg = format!("{e}");
997 let _ = queue_conn.execute(
998 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
999 rusqlite::params![err_msg, queue_id],
1000 );
1001 let current_index = completed + failed + skipped;
1002 failed += 1;
1003 emit_json(&FileEvent {
1004 file: &file_path,
1005 name,
1006 status: "failed",
1007 memory_id: None,
1008 entities: None,
1009 rels: None,
1010 cost_usd: if is_oauth { None } else { Some(cost) },
1011 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1012 error: Some(&err_msg),
1013 index: current_index,
1014 total,
1015 });
1016 if !is_oauth {
1017 cost_total += cost;
1018 }
1019 if args.fail_fast {
1020 break;
1021 }
1022 continue;
1023 }
1024 },
1025 };
1026
1027 for ent in &new_entities {
1028 match entities::upsert_entity(&conn, &namespace, ent) {
1029 Ok(eid) => {
1030 let _ = entities::link_memory_entity(&conn, memory_id, eid);
1031 }
1032 Err(e) => {
1033 tracing::warn!(
1034 target: "ingest",
1035 entity = %ent.name,
1036 error = %e,
1037 "entity skipped due to validation error"
1038 );
1039 }
1040 }
1041 }
1042 for rel in &new_relationships {
1043 crate::parsers::warn_if_non_canonical(&rel.relation);
1044 let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
1045 let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
1046 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1047 let _ = conn.execute(
1048 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
1049 rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
1050 );
1051 }
1052 }
1053
1054 let body_text = String::from_utf8(file_content.clone())
1056 .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
1057 let snippet: String = body_text.chars().take(200).collect();
1058 let chunks_info = crate::chunking::split_into_chunks_hierarchical(&body_text);
1059
1060 let embedding_result = if chunks_info.len() <= 1 {
1061 crate::embedder::embed_passage_local(&paths.models, &body_text)
1062 } else {
1063 let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
1064 let mut multi_ok = true;
1065 for chunk in &chunks_info {
1066 let chunk_text = crate::chunking::chunk_text(&body_text, chunk);
1067 match crate::embedder::embed_passage_local(&paths.models, chunk_text) {
1068 Ok(emb) => chunk_embeddings.push(emb),
1069 Err(e) => {
1070 tracing::warn!(
1071 target: "ingest",
1072 file = %file_path,
1073 error = %e,
1074 "chunk embedding failed, skipping vector index for this file"
1075 );
1076 multi_ok = false;
1077 break;
1078 }
1079 }
1080 }
1081 if multi_ok {
1082 let aggregated = crate::chunking::aggregate_embeddings(&chunk_embeddings);
1083 if let Err(e) = crate::storage::chunks::insert_chunk_slices(
1085 &conn,
1086 memory_id,
1087 &body_text,
1088 &chunks_info,
1089 ) {
1090 tracing::warn!(
1091 target: "ingest",
1092 file = %file_path,
1093 error = %e,
1094 "chunk slice insert failed"
1095 );
1096 } else {
1097 for (i, emb) in chunk_embeddings.iter().enumerate() {
1098 if let Err(e) = crate::storage::chunks::upsert_chunk_vec(
1099 &conn, i as i64, memory_id, i as i32, emb,
1100 ) {
1101 tracing::warn!(
1102 target: "ingest",
1103 file = %file_path,
1104 chunk = i,
1105 error = %e,
1106 "chunk vec upsert failed"
1107 );
1108 }
1109 }
1110 }
1111 Ok(aggregated)
1112 } else {
1113 crate::embedder::embed_passage_local(&paths.models, &body_text)
1115 }
1116 };
1117
1118 match embedding_result {
1119 Ok(embedding) => {
1120 if let Err(e) = memories::upsert_vec(
1121 &conn,
1122 memory_id,
1123 &namespace,
1124 &memory_type_str,
1125 &embedding,
1126 name,
1127 &snippet,
1128 ) {
1129 tracing::warn!(
1130 target: "ingest",
1131 file = %file_path,
1132 error = %e,
1133 "memory vec upsert failed; recall may not find this memory"
1134 );
1135 }
1136 for ent in &new_entities {
1138 if let Ok(Some(eid)) =
1139 entities::find_entity_id(&conn, &namespace, &ent.name)
1140 {
1141 let entity_text = ent.name.clone();
1142 match crate::embedder::embed_passage_local(&paths.models, &entity_text)
1143 {
1144 Ok(emb) => {
1145 if let Err(e) = entities::upsert_entity_vec(
1146 &conn,
1147 eid,
1148 &namespace,
1149 ent.entity_type,
1150 &emb,
1151 &ent.name,
1152 ) {
1153 tracing::warn!(
1154 target: "ingest",
1155 entity = %ent.name,
1156 error = %e,
1157 "entity vec upsert failed"
1158 );
1159 }
1160 }
1161 Err(e) => {
1162 tracing::warn!(
1163 target: "ingest",
1164 entity = %ent.name,
1165 error = %e,
1166 "entity embedding failed"
1167 );
1168 }
1169 }
1170 }
1171 }
1172 }
1173 Err(e) => {
1174 tracing::warn!(
1175 target: "ingest",
1176 file = %file_path,
1177 error = %e,
1178 "memory embedding failed; recall will not find this memory"
1179 );
1180 }
1181 }
1182
1183 let _ = queue_conn.execute(
1184 "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1185 rusqlite::params![
1186 name,
1187 memory_id,
1188 ent_count,
1189 rel_count,
1190 cost,
1191 file_started.elapsed().as_millis() as i64,
1192 queue_id
1193 ],
1194 );
1195
1196 let current_index = completed + failed + skipped;
1197 completed += 1;
1198 entities_total += ent_count;
1199 rels_total += rel_count;
1200 if !is_oauth {
1201 cost_total += cost;
1202 }
1203
1204 emit_json(&FileEvent {
1205 file: &file_path,
1206 name,
1207 status: "done",
1208 memory_id: Some(memory_id),
1209 entities: Some(ent_count),
1210 rels: Some(rel_count),
1211 cost_usd: if is_oauth { None } else { Some(cost) },
1212 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1213 error: None,
1214 index: current_index,
1215 total,
1216 });
1217 } else if let Some(ref err_str) = last_extract_err {
1218 if last_was_rate_limited {
1219 if crate::retry::is_kill_switch_active() {
1220 tracing::warn!(target: "ingest", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1221 } else if std::time::Instant::now() >= rate_limit_deadline {
1222 tracing::error!(target: "ingest", "rate-limit retry deadline (1h) exhausted");
1223 } else {
1224 let half = backoff_secs / 2;
1225 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1226 let actual_wait = half + jitter;
1227 tracing::warn!(target: "ingest", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
1228 let _ = queue_conn.execute(
1229 "UPDATE queue SET status='pending' WHERE id=?1",
1230 rusqlite::params![queue_id],
1231 );
1232 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1233 backoff_secs = (backoff_secs * 2).min(900);
1234 continue;
1235 }
1236 } else {
1237 let _ = queue_conn.execute(
1238 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1239 rusqlite::params![err_str, queue_id],
1240 );
1241 let current_index = completed + failed + skipped;
1242 failed += 1;
1243 emit_json(&FileEvent {
1244 file: &file_path,
1245 name: "",
1246 status: "failed",
1247 memory_id: None,
1248 entities: None,
1249 rels: None,
1250 cost_usd: None,
1251 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1252 error: Some(err_str),
1253 index: current_index,
1254 total,
1255 });
1256 if args.fail_fast {
1257 break;
1258 }
1259 }
1260 }
1261
1262 if let Some(budget) = args.max_cost_usd {
1263 if oauth_detected {
1264 tracing::debug!(target: "ingest", "--max-cost-usd ignored: OAuth subscription detected");
1265 } else if cost_total >= budget {
1266 tracing::warn!(
1267 target: "ingest",
1268 spent = cost_total,
1269 budget = budget,
1270 "budget exceeded, stopping"
1271 );
1272 break;
1273 }
1274 }
1275 }
1276
1277 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1279 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1280
1281 emit_json(&Summary {
1282 summary: true,
1283 files_total: total,
1284 completed,
1285 failed,
1286 skipped,
1287 entities_total,
1288 rels_total,
1289 cost_usd: cost_total,
1290 elapsed_ms: started.elapsed().as_millis() as u64,
1291 });
1292
1293 if !args.keep_queue && failed == 0 {
1294 let _ = std::fs::remove_file(&args.queue_db);
1295 }
1296
1297 Ok(())
1298}
1299
1300#[cfg(test)]
1301mod tests {
1302 use super::*;
1303
1304 #[test]
1305 fn test_extraction_schema_valid_json() {
1306 let _: serde_json::Value =
1307 serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
1308 }
1309
1310 #[test]
1311 fn test_parse_claude_output_valid() {
1312 let output = r#"[
1313 {"type":"system","subtype":"init"},
1314 {"type":"assistant"},
1315 {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
1316 ]"#;
1317 let (result, cost, _is_oauth) = parse_claude_output(output).expect("parse must succeed");
1318 assert_eq!(result.name, "test-doc");
1319 assert_eq!(result.entities.len(), 1);
1320 assert_eq!(result.relationships.len(), 1);
1321 assert!((cost - 0.02).abs() < f64::EPSILON);
1322 }
1323
1324 #[test]
1325 fn test_parse_claude_output_error() {
1326 let output = r#"[
1327 {"type":"system","subtype":"init"},
1328 {"type":"result","is_error":true,"error":"authentication failed"}
1329 ]"#;
1330 let err = parse_claude_output(output).unwrap_err();
1331 assert!(format!("{err}").contains("authentication failed"));
1332 }
1333
1334 #[test]
1335 fn test_parse_claude_output_rate_limit() {
1336 let output = r#"[
1337 {"type":"system","subtype":"init"},
1338 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
1339 ]"#;
1340 let err = parse_claude_output(output).unwrap_err();
1341 assert!(matches!(err, AppError::RateLimited { .. }));
1342 }
1343
1344 #[test]
1345 fn test_parse_claude_output_malformed() {
1346 let output = "not json at all";
1347 assert!(parse_claude_output(output).is_err());
1348 }
1349
1350 #[test]
1351 fn test_find_claude_binary_not_found() {
1352 let original_path = std::env::var_os("PATH");
1353 std::env::set_var("PATH", "/nonexistent");
1354 std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
1355 let result = find_claude_binary(None);
1356 if let Some(p) = original_path {
1357 std::env::set_var("PATH", p);
1358 }
1359 assert!(result.is_err());
1360 }
1361
1362 #[test]
1363 fn test_parse_claude_output_result_fallback() {
1364 let output = r#"[
1365 {"type":"system","subtype":"init"},
1366 {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1367 ]"#;
1368 let (result, cost, _is_oauth) =
1369 parse_claude_output(output).expect("result fallback must work");
1370 assert_eq!(result.name, "test-fallback");
1371 assert_eq!(result.entities.len(), 1);
1372 assert!(result.relationships.is_empty());
1373 assert!((cost - 0.01).abs() < f64::EPSILON);
1374 }
1375
1376 #[test]
1377 fn test_parse_claude_output_error_with_result_field() {
1378 let output = r#"[
1379 {"type":"system","subtype":"init"},
1380 {"type":"result","is_error":true,"result":"Not logged in · Please run /login"}
1381 ]"#;
1382 let err = parse_claude_output(output).unwrap_err();
1383 let msg = format!("{err}");
1384 assert!(
1385 msg.contains("Not logged in"),
1386 "expected 'Not logged in' in: {msg}"
1387 );
1388 }
1389
1390 #[test]
1391 fn test_terminal_reason_max_turns_detected() {
1392 let output = r#"[
1393 {"type":"system","subtype":"init"},
1394 {"type":"result","is_error":false,"terminal_reason":"max_turns","structured_output":{"name":"t","description":"d","entities":[],"relationships":[]}}
1395 ]"#;
1396 let err_or_ok = parse_claude_output(output);
1397 assert!(
1398 err_or_ok.is_ok(),
1399 "max_turns in result without is_error should still parse"
1400 );
1401 }
1402
1403 #[test]
1404 fn test_detect_oauth_from_init_json() {
1405 let output = r#"[
1406 {"type":"system","subtype":"init","apiKeySource":"none"},
1407 {"type":"result","is_error":false,"total_cost_usd":0.50,"structured_output":{"name":"test-oauth","description":"oauth test","entities":[],"relationships":[]}}
1408 ]"#;
1409 let (_result, cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1410 assert!(is_oauth, "apiKeySource=none must be detected as OAuth");
1411 assert!((cost - 0.50).abs() < f64::EPSILON);
1412 }
1413
1414 #[test]
1415 fn test_api_key_source_not_oauth() {
1416 let output = r#"[
1417 {"type":"system","subtype":"init","apiKeySource":"env"},
1418 {"type":"result","is_error":false,"total_cost_usd":0.10,"structured_output":{"name":"test-api","description":"api test","entities":[],"relationships":[]}}
1419 ]"#;
1420 let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1421 assert!(!is_oauth, "apiKeySource=env must NOT be detected as OAuth");
1422 }
1423
1424 #[test]
1425 fn test_missing_api_key_source_defaults_not_oauth() {
1426 let output = r#"[
1427 {"type":"system","subtype":"init"},
1428 {"type":"result","is_error":false,"total_cost_usd":0.05,"structured_output":{"name":"test-missing","description":"missing test","entities":[],"relationships":[]}}
1429 ]"#;
1430 let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1431 assert!(!is_oauth, "missing apiKeySource must default to not OAuth");
1432 }
1433
1434 #[test]
1435 fn test_extraction_schema_entity_types_match_enum() {
1436 let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1437 let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1438 .as_array()
1439 .expect("schema must have entity_type enum");
1440 for t in types {
1441 let s = t.as_str().unwrap();
1442 assert!(
1443 s.parse::<EntityType>().is_ok(),
1444 "schema entity_type '{s}' not in EntityType enum"
1445 );
1446 }
1447 }
1448}