1use crate::commands::ingest::IngestArgs;
13use crate::entity_type::EntityType;
14use crate::errors::AppError;
15use crate::paths::AppPaths;
16use crate::spawn::env_whitelist::apply_env_whitelist;
17use crate::storage::connection::{ensure_db_ready, open_rw};
18use crate::storage::entities::{self, NewEntity, NewRelationship};
19use crate::storage::memories::{self, NewMemory};
20
21use rusqlite::Connection;
22use serde::{Deserialize, Serialize};
23use std::io::Write;
24use std::path::{Path, PathBuf};
25use std::process::{Command, Stdio};
26use std::time::Instant;
27
28const MIN_CLAUDE_VERSION: &str = "2.1.0";
29
30const EXTRACTION_SCHEMA: &str = r#"{
31 "type": "object",
32 "properties": {
33 "name": { "type": "string" },
34 "description": { "type": "string" },
35 "entities": {
36 "type": "array",
37 "items": {
38 "type": "object",
39 "properties": {
40 "name": { "type": "string" },
41 "entity_type": {
42 "type": "string",
43 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
44 }
45 },
46 "required": ["name", "entity_type"],
47 "additionalProperties": false
48 }
49 },
50 "relationships": {
51 "type": "array",
52 "items": {
53 "type": "object",
54 "properties": {
55 "source": { "type": "string" },
56 "target": { "type": "string" },
57 "relation": {
58 "type": "string",
59 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
60 },
61 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
62 },
63 "required": ["source","target","relation","strength"],
64 "additionalProperties": false
65 }
66 }
67 },
68 "required": ["name","description","entities","relationships"],
69 "additionalProperties": false
70}"#;
71
72const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
731. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
742. A one-sentence description (10-20 words) summarizing the key insight\n\
753. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
764. Typed relationships between entities with strength scores\n\n\
77Rules:\n\
78- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
79- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
80- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
81- NEVER use 'mentions' as relationship type\n\
82- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
83- Prefer fewer high-quality entities over many low-quality ones\n\
84- Description must answer: What is this about and WHY does it matter?";
85
86#[derive(Debug, Deserialize)]
87struct ClaudeOutputElement {
88 r#type: Option<String>,
89 #[serde(default)]
90 is_error: bool,
91 result: Option<String>,
92 error: Option<String>,
93 terminal_reason: Option<String>,
94}
95
96#[derive(Debug, Clone, Deserialize, Serialize)]
97pub struct ExtractionResult {
98 pub name: String,
99 pub description: String,
100 pub entities: Vec<ExtractedEntity>,
101 pub relationships: Vec<ExtractedRelationship>,
102}
103
104#[derive(Debug, Clone, Deserialize, Serialize)]
105pub struct ExtractedEntity {
106 pub name: String,
107 pub entity_type: String,
108}
109
110#[derive(Debug, Clone, Deserialize, Serialize)]
111pub struct ExtractedRelationship {
112 pub source: String,
113 pub target: String,
114 pub relation: String,
115 pub strength: f64,
116}
117
118#[derive(Debug, Serialize)]
119struct PhaseEvent<'a> {
120 phase: &'a str,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 claude_path: Option<&'a str>,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 version: Option<&'a str>,
125 #[serde(skip_serializing_if = "Option::is_none")]
126 dir: Option<&'a str>,
127 #[serde(skip_serializing_if = "Option::is_none")]
128 files_total: Option<usize>,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 files_new: Option<usize>,
131 #[serde(skip_serializing_if = "Option::is_none")]
132 files_existing: Option<usize>,
133}
134
135#[derive(Debug, Serialize)]
136struct FileEvent<'a> {
137 file: &'a str,
138 name: &'a str,
139 status: &'a str,
140 #[serde(skip_serializing_if = "Option::is_none")]
141 memory_id: Option<i64>,
142 #[serde(skip_serializing_if = "Option::is_none")]
143 entities: Option<usize>,
144 #[serde(skip_serializing_if = "Option::is_none")]
145 rels: Option<usize>,
146 #[serde(skip_serializing_if = "Option::is_none")]
147 cost_usd: Option<f64>,
148 #[serde(skip_serializing_if = "Option::is_none")]
149 elapsed_ms: Option<u64>,
150 #[serde(skip_serializing_if = "Option::is_none")]
151 error: Option<&'a str>,
152 index: usize,
153 total: usize,
154}
155
156#[derive(Debug, Serialize)]
157struct Summary {
158 summary: bool,
159 files_total: usize,
160 completed: usize,
161 failed: usize,
162 skipped: usize,
163 entities_total: usize,
164 rels_total: usize,
165 cost_usd: f64,
166 elapsed_ms: u64,
167}
168
169pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
171 if let Some(p) = explicit {
172 if p.exists() {
173 return Ok(p.to_path_buf());
174 }
175 return Err(AppError::Validation(format!(
176 "Claude Code binary not found at explicit path: {}",
177 p.display()
178 )));
179 }
180
181 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
182 let p = PathBuf::from(&env_path);
183 if p.exists() {
184 return Ok(p);
185 }
186 }
187
188 let name = if cfg!(windows) {
189 "claude.exe"
190 } else {
191 "claude"
192 };
193 if let Some(path_var) = std::env::var_os("PATH") {
194 for dir in std::env::split_paths(&path_var) {
195 let candidate = dir.join(name);
196 if candidate.exists() {
197 return Ok(candidate);
198 }
199 }
200 }
201
202 Err(AppError::Validation(
203 "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
204 ))
205}
206
207fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
209 let output = Command::new(binary)
210 .arg("--version")
211 .stdin(Stdio::null())
212 .stdout(Stdio::piped())
213 .stderr(Stdio::piped())
214 .output()
215 .map_err(AppError::Io)?;
216
217 if !output.status.success() {
218 return Err(AppError::Validation(
219 "failed to run 'claude --version'".to_string(),
220 ));
221 }
222
223 let version_str = String::from_utf8(output.stdout)
224 .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
225 let version = version_str.trim().to_string();
226
227 let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
229
230 fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
231 let parts: Vec<&str> = s.splitn(3, '.').collect();
232 if parts.len() < 2 {
233 return None;
234 }
235 let major = parts[0].parse::<u64>().ok()?;
236 let minor = parts[1].parse::<u64>().ok()?;
237 let patch = parts
238 .get(2)
239 .and_then(|p| p.parse::<u64>().ok())
240 .unwrap_or(0);
241 Some((major, minor, patch))
242 }
243
244 if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
245 if actual < min {
246 return Err(AppError::Validation(format!(
247 "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
248 )));
249 }
250 }
251
252 Ok(version)
253}
254
255fn extract_with_claude(
268 binary: &Path,
269 file_content: &[u8],
270 model: Option<&str>,
271 timeout_secs: u64,
272) -> Result<(ExtractionResult, f64, bool), AppError> {
273 use wait_timeout::ChildExt;
274
275 if let Ok(_key) = std::env::var("ANTHROPIC_API_KEY") {
279 let mut cmd = Command::new("false");
280 cmd.env_clear();
281 cmd.env("PATH", "/nonexistent");
282 cmd.arg("--oauth-only-violation-anthropic-api-key-set");
283 return Err(AppError::Validation(
284 "ANTHROPIC_API_KEY is set in the environment; \
285 sqlite-graphrag operates exclusively with OAuth (Pro/Max) and \
286 the API-key path is PROHIBITED (gaps.md:47). Unset the variable \
287 and re-run with `claude login` already completed in this session."
288 .to_string(),
289 ));
290 }
291
292 let mut cmd = Command::new(binary);
293
294 apply_env_whitelist(&mut cmd, crate::spawn::env_whitelist::is_strict_env_clear());
297 crate::spawn::apply_cwd_isolation(&mut cmd)?;
298
299 let mcp_config_path = crate::spawn::preflight::write_empty_mcp_config_tempfile()?;
306
307 cmd.arg("-p")
308 .arg(EXTRACTION_PROMPT)
309 .arg("--strict-mcp-config")
310 .arg("--mcp-config")
311 .arg(mcp_config_path.as_os_str())
312 .arg("--dangerously-skip-permissions")
313 .arg("--settings")
314 .arg(r#"{"hooks":{}}"#)
315 .arg("--output-format")
316 .arg("json")
317 .arg("--json-schema")
318 .arg(EXTRACTION_SCHEMA)
319 .arg("--max-turns")
320 .arg("7")
321 .arg("--no-session-persistence");
322
323 if let Some(m) = model {
324 cmd.arg("--model").arg(m);
325 }
326
327 cmd.stdin(Stdio::piped())
328 .stdout(Stdio::piped())
329 .stderr(Stdio::piped());
330
331 let argv_refs: Vec<std::ffi::OsString> = cmd.get_args().map(|s| s.to_os_string()).collect();
335 let preflight_args = crate::spawn::preflight::PreFlightArgs {
336 binary_path: binary,
337 argv: &argv_refs,
338 workspace_root: std::path::Path::new("."),
339 mcp_config_inline_json: None,
340 expected_output_bytes: 65_536,
341 spawner_name: "ingest_claude",
342 };
343 if let Err(e) = crate::spawn::preflight::preflight_check(&preflight_args) {
344 return Err(crate::errors::AppError::from(e));
350 }
351
352 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
353 AppError::Io(std::io::Error::new(
354 e.kind(),
355 format!("failed to spawn claude: {e}"),
356 ))
357 })?;
358
359 let stdin_data = file_content.to_vec();
360 let mut child_stdin = child
361 .stdin
362 .take()
363 .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
364 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
365 child_stdin.write_all(&stdin_data)?;
366 drop(child_stdin);
367 Ok(())
368 });
369
370 let start = std::time::Instant::now();
371 let timeout = std::time::Duration::from_secs(timeout_secs);
372 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
373
374 match status {
375 Some(exit_status) => {
376 stdin_thread
377 .join()
378 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
379 .map_err(AppError::Io)?;
380
381 tracing::debug!(
382 target: "process",
383 exit_code = ?exit_status.code(),
384 elapsed_ms = start.elapsed().as_millis() as u64,
385 "external process completed"
386 );
387
388 let mut stdout_buf = Vec::new();
389 let mut stderr_buf = Vec::new();
390 if let Some(mut out) = child.stdout.take() {
391 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
392 }
393 if let Some(mut err) = child.stderr.take() {
394 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
395 }
396
397 if !exit_status.success() {
398 let stdout_str = String::from_utf8_lossy(&stdout_buf);
399 if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
400 {
401 if let Some(re) = elements
402 .iter()
403 .find(|e| e.r#type.as_deref() == Some("result"))
404 {
405 if re.terminal_reason.as_deref() == Some("max_turns") {
406 tracing::warn!(
407 target: "ingest",
408 "extraction hit max_turns limit — hooks may have consumed turns"
409 );
410 return Err(AppError::Validation(
411 "claude -p hit max_turns: hooks may be consuming turns".into(),
412 ));
413 }
414 if re.is_error {
415 let err_msg = re
416 .error
417 .as_deref()
418 .or(re.result.as_deref())
419 .unwrap_or("unknown error");
420 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
421 return Err(AppError::RateLimited {
422 detail: err_msg.to_string(),
423 });
424 }
425 if err_msg.contains("Not logged in")
426 || err_msg.contains("authentication")
427 {
428 tracing::warn!(
429 target: "ingest",
430 "Claude Code authentication failed. Re-authenticate interactively with: claude"
431 );
432 }
433 return Err(AppError::Validation(format!(
434 "claude -p failed: {err_msg}"
435 )));
436 }
437 }
438 }
439 let stderr_str = String::from_utf8_lossy(&stderr_buf);
440 if stderr_str.contains("auth") || stderr_str.contains("login") {
441 tracing::warn!(
442 target: "ingest",
443 "Claude Code authentication may have failed. Re-authenticate with: claude"
444 );
445 }
446 return Err(AppError::Validation(format!(
447 "claude -p exited with code {:?}: {}",
448 exit_status.code(),
449 stderr_str.trim()
450 )));
451 }
452
453 let stdout = String::from_utf8(stdout_buf)
454 .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
455 parse_claude_output(&stdout)
456 }
457 None => {
458 tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
459 let _ = child.kill();
460 let _ = child.wait();
461 let _ = stdin_thread.join();
462 Err(AppError::Validation(format!(
463 "claude -p timed out after {timeout_secs} seconds"
464 )))
465 }
466 }
467}
468
469fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64, bool), AppError> {
479 let result = super::claude_runner::parse_claude_output_opts(stdout, true)?;
480 let extraction: ExtractionResult = serde_json::from_value(result.value).map_err(|e| {
481 AppError::Validation(format!(
482 "failed to deserialize claude output as ExtractionResult: {e}"
483 ))
484 })?;
485 Ok((extraction, result.cost_usd, result.is_oauth))
486}
487
488use crate::output::emit_json_line as emit_json;
489
490fn collect_matching_files(
492 dir: &Path,
493 pattern: &str,
494 recursive: bool,
495 max_files: usize,
496) -> Result<Vec<PathBuf>, AppError> {
497 let mut files = Vec::new();
498 super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
499 files.sort_unstable();
500
501 if files.len() > max_files {
502 return Err(AppError::Validation(format!(
503 "found {} files, exceeds --max-files cap of {}",
504 files.len(),
505 max_files
506 )));
507 }
508
509 Ok(files)
510}
511
512fn open_queue_db<P: AsRef<std::path::Path>>(path: P) -> Result<Connection, AppError> {
514 let conn = Connection::open(path)?;
515
516 conn.pragma_update(None, "journal_mode", "wal")?;
517
518 conn.execute_batch(
519 "CREATE TABLE IF NOT EXISTS queue (
520 id INTEGER PRIMARY KEY AUTOINCREMENT,
521 file_path TEXT NOT NULL UNIQUE,
522 name TEXT,
523 status TEXT NOT NULL DEFAULT 'pending',
524 memory_id INTEGER,
525 entities INTEGER DEFAULT 0,
526 rels INTEGER DEFAULT 0,
527 error TEXT,
528 cost_usd REAL DEFAULT 0.0,
529 attempt INTEGER DEFAULT 0,
530 elapsed_ms INTEGER,
531 created_at TEXT DEFAULT (datetime('now')),
532 done_at TEXT
533 );
534 CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
535 )?;
536
537 Ok(conn)
538}
539
540pub fn run_claude_ingest(
542 args: &IngestArgs,
543 embedding_backend: crate::cli::EmbeddingBackendChoice,
544 llm_backend: crate::cli::LlmBackendChoice,
545) -> Result<(), AppError> {
546 let started = Instant::now();
547
548 if !args.dir.exists() {
549 return Err(AppError::Validation(format!(
550 "directory not found: {}",
551 args.dir.display()
552 )));
553 }
554
555 let early_ns = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
560 let early_paths = AppPaths::resolve(args.db.as_deref())?;
561 let queue_path = match args.queue_db.as_deref() {
562 Some(p) => std::path::PathBuf::from(p),
563 None => crate::paths::sidecar_path(&early_paths.db, ".ingest-queue.sqlite"),
564 };
565 let _singleton = crate::lock::acquire_job_singleton(
566 crate::lock::JobType::IngestClaudeCode,
567 &early_ns,
568 &early_paths.db,
569 args.wait_job_singleton,
570 args.force_job_singleton,
571 )?;
572
573 let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
575 let version = validate_claude_version(&claude_binary)?;
576 tracing::info!(
577 target: "ingest",
578 binary = %claude_binary.display(),
579 version = %version,
580 "Claude Code binary validated"
581 );
582
583 emit_json(&PhaseEvent {
584 phase: "validate",
585 claude_path: claude_binary.to_str(),
586 version: Some(&version),
587 dir: None,
588 files_total: None,
589 files_new: None,
590 files_existing: None,
591 });
592
593 let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
595
596 let queue_conn = open_queue_db(&queue_path)?;
597
598 if args.resume {
599 let reset = queue_conn
600 .execute(
601 "UPDATE queue SET status='pending' WHERE status='processing'",
602 [],
603 )
604 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
605 if reset > 0 {
606 tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
607 }
608 }
609
610 if args.retry_failed {
611 let count = queue_conn
612 .execute(
613 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
614 [],
615 )
616 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
617 tracing::info!(target: "ingest", count, "retrying failed files");
618 }
619
620 if !args.resume && !args.retry_failed {
621 queue_conn
622 .execute("DELETE FROM queue", [])
623 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
624 }
625
626 let mut new_count = 0usize;
627 let mut existing_count = 0usize;
628
629 if !args.retry_failed {
630 for file in &files {
631 let file_str = file.to_string_lossy().into_owned();
632 let inserted = queue_conn
633 .execute(
634 "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
635 rusqlite::params![file_str],
636 )
637 .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
638 if inserted > 0 {
639 new_count += 1;
640 } else {
641 existing_count += 1;
642 }
643 }
644 }
645
646 emit_json(&PhaseEvent {
647 phase: "scan",
648 claude_path: None,
649 version: None,
650 dir: args.dir.to_str(),
651 files_total: Some(files.len()),
652 files_new: Some(new_count),
653 files_existing: Some(existing_count),
654 });
655
656 if args.dry_run {
657 for (idx, file) in files.iter().enumerate() {
658 let (name, _truncated, _orig) =
659 super::ingest::derive_kebab_name(file, args.max_name_length);
660 emit_json(&FileEvent {
661 file: &file.to_string_lossy(),
662 name: &name,
663 status: "preview",
664 memory_id: None,
665 entities: None,
666 rels: None,
667 cost_usd: None,
668 elapsed_ms: None,
669 error: None,
670 index: idx,
671 total: files.len(),
672 });
673 }
674 emit_json(&Summary {
675 summary: true,
676 files_total: files.len(),
677 completed: 0,
678 failed: 0,
679 skipped: 0,
680 entities_total: 0,
681 rels_total: 0,
682 cost_usd: 0.0,
683 elapsed_ms: started.elapsed().as_millis() as u64,
684 });
685 if !args.keep_queue {
686 let _ = std::fs::remove_file(&queue_path);
687 }
688 return Ok(());
689 }
690
691 let paths = AppPaths::resolve(args.db.as_deref())?;
693 ensure_db_ready(&paths)?;
694 let conn = open_rw(&paths.db)?;
695
696 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
697 let memory_type_str = args.r#type.as_str().to_string();
698
699 let mut completed = 0usize;
700 let mut failed = 0usize;
701 let skipped_initial: usize = queue_conn
702 .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
703 r.get::<_, usize>(0)
704 })
705 .unwrap_or(0);
706 let mut skipped = skipped_initial;
707 let mut entities_total = 0usize;
708 let mut rels_total = 0usize;
709 let mut cost_total = 0.0f64;
710 let mut oauth_detected = false;
711 let total = files.len();
712
713 let mut backoff_secs = args.rate_limit_wait;
714 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
715
716 loop {
717 if crate::shutdown_requested() {
718 tracing::info!(target: "ingest", "shutdown requested, stopping before next file");
719 break;
720 }
721
722 let pending: Option<(i64, String)> = queue_conn
723 .query_row(
724 "UPDATE queue SET status='processing', attempt=attempt+1 \
725 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
726 RETURNING id, file_path",
727 [],
728 |row| Ok((row.get(0)?, row.get(1)?)),
729 )
730 .ok();
731
732 let (queue_id, file_path) = match pending {
733 Some(p) => p,
734 None => break,
735 };
736
737 let file_started = Instant::now();
738
739 const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
741 if let Ok(meta) = std::fs::metadata(&file_path) {
742 if meta.len() > MAX_FILE_SIZE {
743 let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
744 let _ = queue_conn.execute(
745 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
746 rusqlite::params![err_msg, queue_id],
747 );
748 let current_index = completed + failed + skipped;
749 failed += 1;
750 emit_json(&FileEvent {
751 file: &file_path,
752 name: "",
753 status: "failed",
754 memory_id: None,
755 entities: None,
756 rels: None,
757 cost_usd: None,
758 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
759 error: Some(&err_msg),
760 index: current_index,
761 total,
762 });
763 if args.fail_fast {
764 break;
765 }
766 continue;
767 }
768 }
769
770 let file_content = match std::fs::read(&file_path) {
771 Ok(c) => c,
772 Err(e) => {
773 let err_msg = format!("IO error: {e}");
774 let _ = queue_conn.execute(
775 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
776 rusqlite::params![err_msg, queue_id],
777 );
778 let current_index = completed + failed + skipped;
779 failed += 1;
780 emit_json(&FileEvent {
781 file: &file_path,
782 name: "",
783 status: "failed",
784 memory_id: None,
785 entities: None,
786 rels: None,
787 cost_usd: None,
788 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
789 error: Some(&err_msg),
790 index: current_index,
791 total,
792 });
793 if args.fail_fast {
794 break;
795 }
796 continue;
797 }
798 };
799
800 if file_content.len() > crate::constants::MAX_MEMORY_BODY_LEN {
802 let err_msg = format!(
803 "file body exceeds {} byte limit ({} bytes) — skipping to avoid wasting LLM tokens",
804 crate::constants::MAX_MEMORY_BODY_LEN,
805 file_content.len()
806 );
807 tracing::warn!(target: "ingest", file = %file_path, size = file_content.len(), "body exceeds limit, skipping LLM extraction");
808 let _ = queue_conn.execute(
809 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
810 rusqlite::params![err_msg, queue_id],
811 );
812 let current_index = completed + failed + skipped;
813 skipped += 1;
814 emit_json(&FileEvent {
815 file: &file_path,
816 name: "",
817 status: "skipped",
818 memory_id: None,
819 entities: None,
820 rels: None,
821 cost_usd: None,
822 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
823 error: Some(&err_msg),
824 index: current_index,
825 total,
826 });
827 continue;
828 }
829
830 let max_extract_attempts: u32 = 2;
832 let mut extraction_result: Option<(ExtractionResult, f64, bool)> = None;
833 let mut last_extract_err: Option<String> = None;
834 let mut last_was_rate_limited = false;
835
836 for attempt in 1..=max_extract_attempts {
837 match extract_with_claude(
838 &claude_binary,
839 &file_content,
840 args.claude_model.as_deref(),
841 args.claude_timeout,
842 ) {
843 Ok(result) => {
844 extraction_result = Some(result);
845 break;
846 }
847 Err(ref e) if matches!(e, AppError::RateLimited { .. }) => {
848 last_extract_err = Some(format!("{e}"));
849 last_was_rate_limited = true;
850 break;
851 }
852 Err(e) => {
853 let msg = format!("{e}");
854 if attempt < max_extract_attempts {
855 let cold_start_delay = 2 * attempt as u64;
856 tracing::warn!(target: "ingest", attempt, delay_secs = cold_start_delay, error = %msg, "extraction failed, retrying (cold-start workaround)");
857 std::thread::sleep(std::time::Duration::from_secs(cold_start_delay));
858 }
859 last_extract_err = Some(msg);
860 }
861 }
862 }
863
864 if let Some((extraction, cost, is_oauth)) = extraction_result {
865 if is_oauth && !oauth_detected {
866 oauth_detected = true;
867 tracing::info!(target: "ingest", "OAuth subscription detected — cost_usd omitted from output");
868 }
869 backoff_secs = args.rate_limit_wait;
870
871 let (normalized_name, _truncated, _orig) = crate::commands::ingest::derive_kebab_name(
872 std::path::Path::new(&extraction.name),
873 args.max_name_length,
874 );
875 let name = &normalized_name;
876 let ent_count = extraction.entities.len();
877 let rel_count = 0;
878
879 let new_entities: Vec<NewEntity> = extraction
882 .entities
883 .iter()
884 .map(|e| NewEntity {
885 name: e.name.clone(),
886 entity_type: EntityType::map_to_canonical(&e.entity_type),
887 description: None,
888 })
889 .collect();
890
891 let new_relationships: Vec<NewRelationship> = extraction
894 .relationships
895 .iter()
896 .map(|r| NewRelationship {
897 source: r.source.clone(),
898 target: r.target.clone(),
899 relation: crate::parsers::map_to_canonical_relation(&r.relation),
900 strength: r.strength,
901 description: None,
902 })
903 .collect();
904
905 let body_str = String::from_utf8(file_content.clone())
906 .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
907 let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
908 let new_memory = NewMemory {
909 name: name.clone(),
910 namespace: namespace.clone(),
911 memory_type: memory_type_str.clone(),
912 description: extraction.description.clone(),
913 body: body_str.to_string(),
914 body_hash,
915 session_id: None,
916 source: "agent".to_string(),
917 metadata: serde_json::Value::Object(serde_json::Map::new()),
918 };
919
920 let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
922 Some((existing_id, is_deleted)) => {
923 if is_deleted {
924 memories::clear_deleted_at(&conn, existing_id)?;
925 }
926 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
927 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
928 rusqlite::params![existing_id],
929 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
930 )?;
931 memories::update(&conn, existing_id, &new_memory, None)?;
932 memories::sync_fts_after_update(
933 &conn,
934 existing_id,
935 &old_name,
936 &old_desc,
937 &old_body,
938 &new_memory.name,
939 &new_memory.description,
940 &new_memory.body,
941 )?;
942 tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
943 existing_id
944 }
945 None => match memories::insert(&conn, &new_memory) {
946 Ok(id) => id,
947 Err(e) => {
948 let err_msg = format!("{e}");
949 let _ = queue_conn.execute(
950 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
951 rusqlite::params![err_msg, queue_id],
952 );
953 let current_index = completed + failed + skipped;
954 failed += 1;
955 emit_json(&FileEvent {
956 file: &file_path,
957 name,
958 status: "failed",
959 memory_id: None,
960 entities: None,
961 rels: None,
962 cost_usd: if is_oauth { None } else { Some(cost) },
963 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
964 error: Some(&err_msg),
965 index: current_index,
966 total,
967 });
968 if !is_oauth {
969 cost_total += cost;
970 }
971 if args.fail_fast {
972 break;
973 }
974 continue;
975 }
976 },
977 };
978
979 for ent in &new_entities {
980 match entities::upsert_entity(&conn, &namespace, ent) {
981 Ok(eid) => {
982 let _ = entities::link_memory_entity(&conn, memory_id, eid);
983 }
984 Err(e) => {
985 tracing::warn!(
986 target: "ingest",
987 entity = %ent.name,
988 error = %e,
989 "entity skipped due to validation error"
990 );
991 }
992 }
993 }
994 for rel in &new_relationships {
995 crate::parsers::warn_if_non_canonical(&rel.relation);
996 let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
997 let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
998 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
999 let _ = conn.execute(
1000 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
1001 rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
1002 );
1003 }
1004 }
1005
1006 let body_text = String::from_utf8(file_content.clone())
1008 .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
1009 let snippet: String = body_text.chars().take(200).collect();
1010 let chunks_info = crate::chunking::split_into_chunks_hierarchical(&body_text);
1011
1012 let embedding_result = if chunks_info.len() <= 1 {
1014 crate::embedder::embed_passage_with_embedding_choice(
1015 &paths.models,
1016 &body_text,
1017 embedding_backend,
1018 llm_backend,
1019 )
1020 .map(|(v, _)| v)
1021 } else {
1022 let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
1023 let mut multi_ok = true;
1024 for chunk in &chunks_info {
1025 let chunk_text = crate::chunking::chunk_text(&body_text, chunk);
1026 match crate::embedder::embed_passage_with_embedding_choice(
1027 &paths.models,
1028 chunk_text,
1029 embedding_backend,
1030 llm_backend,
1031 )
1032 .map(|(v, _)| v)
1033 {
1034 Ok(emb) => chunk_embeddings.push(emb),
1035 Err(e) => {
1036 tracing::warn!(
1037 target: "ingest",
1038 file = %file_path,
1039 error = %e,
1040 "chunk embedding failed, skipping vector index for this file"
1041 );
1042 multi_ok = false;
1043 break;
1044 }
1045 }
1046 }
1047 if multi_ok {
1048 let aggregated = crate::chunking::aggregate_embeddings(&chunk_embeddings);
1049 if let Err(e) = crate::storage::chunks::insert_chunk_slices(
1051 &conn,
1052 memory_id,
1053 &body_text,
1054 &chunks_info,
1055 ) {
1056 tracing::warn!(
1057 target: "ingest",
1058 file = %file_path,
1059 error = %e,
1060 "chunk slice insert failed"
1061 );
1062 } else {
1063 for (i, emb) in chunk_embeddings.iter().enumerate() {
1064 if let Err(e) = crate::storage::chunks::upsert_chunk_vec(
1065 &conn, i as i64, memory_id, i as i32, emb,
1066 ) {
1067 tracing::warn!(
1068 target: "ingest",
1069 file = %file_path,
1070 chunk = i,
1071 error = %e,
1072 "chunk vec upsert failed"
1073 );
1074 }
1075 }
1076 }
1077 Ok(aggregated)
1078 } else {
1079 crate::embedder::embed_passage_with_embedding_choice(
1081 &paths.models,
1082 &body_text,
1083 embedding_backend,
1084 llm_backend,
1085 )
1086 .map(|(v, _)| v)
1087 }
1088 };
1089
1090 match embedding_result {
1091 Ok(embedding) => {
1092 if let Err(e) = memories::upsert_vec(
1093 &conn,
1094 memory_id,
1095 &namespace,
1096 &memory_type_str,
1097 &embedding,
1098 name,
1099 &snippet,
1100 ) {
1101 tracing::warn!(
1102 target: "ingest",
1103 file = %file_path,
1104 error = %e,
1105 "memory vec upsert failed; recall may not find this memory"
1106 );
1107 }
1108 for ent in &new_entities {
1110 if let Ok(Some(eid)) =
1111 entities::find_entity_id(&conn, &namespace, &ent.name)
1112 {
1113 let entity_text = ent.name.clone();
1114 match crate::embedder::embed_passage_with_embedding_choice(
1115 &paths.models,
1116 &entity_text,
1117 embedding_backend,
1118 llm_backend,
1119 )
1120 .map(|(v, _)| v)
1121 {
1122 Ok(emb) => {
1123 if let Err(e) = entities::upsert_entity_vec(
1124 &conn,
1125 eid,
1126 &namespace,
1127 ent.entity_type,
1128 &emb,
1129 &ent.name,
1130 ) {
1131 tracing::warn!(
1132 target: "ingest",
1133 entity = %ent.name,
1134 error = %e,
1135 "entity vec upsert failed"
1136 );
1137 }
1138 }
1139 Err(e) => {
1140 tracing::warn!(
1141 target: "ingest",
1142 entity = %ent.name,
1143 error = %e,
1144 "entity embedding failed"
1145 );
1146 }
1147 }
1148 }
1149 }
1150 }
1151 Err(e) => {
1152 tracing::warn!(
1153 target: "ingest",
1154 file = %file_path,
1155 error = %e,
1156 "memory embedding failed; recall will not find this memory"
1157 );
1158 }
1159 }
1160
1161 let _ = queue_conn.execute(
1162 "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1163 rusqlite::params![
1164 name,
1165 memory_id,
1166 ent_count,
1167 rel_count,
1168 cost,
1169 file_started.elapsed().as_millis() as i64,
1170 queue_id
1171 ],
1172 );
1173
1174 let current_index = completed + failed + skipped;
1175 completed += 1;
1176 entities_total += ent_count;
1177 rels_total += rel_count;
1178 if !is_oauth {
1179 cost_total += cost;
1180 }
1181
1182 emit_json(&FileEvent {
1183 file: &file_path,
1184 name,
1185 status: "done",
1186 memory_id: Some(memory_id),
1187 entities: Some(ent_count),
1188 rels: Some(rel_count),
1189 cost_usd: if is_oauth { None } else { Some(cost) },
1190 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1191 error: None,
1192 index: current_index,
1193 total,
1194 });
1195 } else if let Some(ref err_str) = last_extract_err {
1196 if last_was_rate_limited {
1197 if crate::retry::is_kill_switch_active() {
1198 tracing::warn!(target: "ingest", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1199 } else if std::time::Instant::now() >= rate_limit_deadline {
1200 tracing::error!(target: "ingest", "rate-limit retry deadline (1h) exhausted");
1201 } else {
1202 let half = backoff_secs / 2;
1203 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1204 let actual_wait = half + jitter;
1205 tracing::warn!(target: "ingest", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
1206 let _ = queue_conn.execute(
1207 "UPDATE queue SET status='pending' WHERE id=?1",
1208 rusqlite::params![queue_id],
1209 );
1210 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1211 backoff_secs = (backoff_secs * 2).min(900);
1212 continue;
1213 }
1214 } else {
1215 let _ = queue_conn.execute(
1216 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1217 rusqlite::params![err_str, queue_id],
1218 );
1219 let current_index = completed + failed + skipped;
1220 failed += 1;
1221 emit_json(&FileEvent {
1222 file: &file_path,
1223 name: "",
1224 status: "failed",
1225 memory_id: None,
1226 entities: None,
1227 rels: None,
1228 cost_usd: None,
1229 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1230 error: Some(err_str),
1231 index: current_index,
1232 total,
1233 });
1234 if args.fail_fast {
1235 break;
1236 }
1237 }
1238 }
1239
1240 if let Some(budget) = args.max_cost_usd {
1241 if oauth_detected {
1242 tracing::debug!(target: "ingest", "--max-cost-usd ignored: OAuth subscription detected");
1243 } else if cost_total >= budget {
1244 tracing::warn!(
1245 target: "ingest",
1246 spent = cost_total,
1247 budget = budget,
1248 "budget exceeded, stopping"
1249 );
1250 break;
1251 }
1252 }
1253 }
1254
1255 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1257 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1258
1259 emit_json(&Summary {
1260 summary: true,
1261 files_total: total,
1262 completed,
1263 failed,
1264 skipped,
1265 entities_total,
1266 rels_total,
1267 cost_usd: cost_total,
1268 elapsed_ms: started.elapsed().as_millis() as u64,
1269 });
1270
1271 if !args.keep_queue && failed == 0 {
1272 let _ = std::fs::remove_file(&queue_path);
1273 }
1274
1275 Ok(())
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280 use super::*;
1281
1282 #[test]
1283 fn test_extraction_schema_valid_json() {
1284 let _: serde_json::Value =
1285 serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
1286 }
1287
1288 #[test]
1289 fn test_parse_claude_output_valid() {
1290 let output = r#"[
1291 {"type":"system","subtype":"init"},
1292 {"type":"assistant"},
1293 {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
1294 ]"#;
1295 let (result, cost, _is_oauth) = parse_claude_output(output).expect("parse must succeed");
1296 assert_eq!(result.name, "test-doc");
1297 assert_eq!(result.entities.len(), 1);
1298 assert_eq!(result.relationships.len(), 1);
1299 assert!((cost - 0.02).abs() < f64::EPSILON);
1300 }
1301
1302 #[test]
1303 fn test_parse_claude_output_error() {
1304 let output = r#"[
1305 {"type":"system","subtype":"init"},
1306 {"type":"result","is_error":true,"error":"authentication failed"}
1307 ]"#;
1308 let err = parse_claude_output(output).unwrap_err();
1309 assert!(format!("{err}").contains("authentication failed"));
1310 }
1311
1312 #[test]
1313 fn test_parse_claude_output_rate_limit() {
1314 let output = r#"[
1315 {"type":"system","subtype":"init"},
1316 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
1317 ]"#;
1318 let err = parse_claude_output(output).unwrap_err();
1319 assert!(matches!(err, AppError::RateLimited { .. }));
1320 }
1321
1322 #[test]
1323 fn test_parse_claude_output_malformed() {
1324 let output = "not json at all";
1325 assert!(parse_claude_output(output).is_err());
1326 }
1327
1328 #[test]
1329 fn test_find_claude_binary_not_found() {
1330 let original_path = std::env::var_os("PATH");
1331 std::env::set_var("PATH", "/nonexistent");
1332 std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
1333 let result = find_claude_binary(None);
1334 if let Some(p) = original_path {
1335 std::env::set_var("PATH", p);
1336 }
1337 assert!(result.is_err());
1338 }
1339
1340 #[test]
1341 fn test_parse_claude_output_result_fallback() {
1342 let output = r#"[
1343 {"type":"system","subtype":"init"},
1344 {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1345 ]"#;
1346 let (result, cost, _is_oauth) =
1347 parse_claude_output(output).expect("result fallback must work");
1348 assert_eq!(result.name, "test-fallback");
1349 assert_eq!(result.entities.len(), 1);
1350 assert!(result.relationships.is_empty());
1351 assert!((cost - 0.01).abs() < f64::EPSILON);
1352 }
1353
1354 #[test]
1355 fn test_parse_claude_output_error_with_result_field() {
1356 let output = r#"[
1357 {"type":"system","subtype":"init"},
1358 {"type":"result","is_error":true,"result":"Not logged in · Please run /login"}
1359 ]"#;
1360 let err = parse_claude_output(output).unwrap_err();
1361 let msg = format!("{err}");
1362 assert!(
1363 msg.contains("Not logged in"),
1364 "expected 'Not logged in' in: {msg}"
1365 );
1366 }
1367
1368 #[test]
1369 fn test_terminal_reason_max_turns_detected() {
1370 let output = r#"[
1371 {"type":"system","subtype":"init"},
1372 {"type":"result","is_error":false,"terminal_reason":"max_turns","structured_output":{"name":"t","description":"d","entities":[],"relationships":[]}}
1373 ]"#;
1374 let err_or_ok = parse_claude_output(output);
1375 assert!(
1376 err_or_ok.is_ok(),
1377 "max_turns in result without is_error should still parse"
1378 );
1379 }
1380
1381 #[test]
1382 fn test_detect_oauth_from_init_json() {
1383 let output = r#"[
1384 {"type":"system","subtype":"init","apiKeySource":"none"},
1385 {"type":"result","is_error":false,"total_cost_usd":0.50,"structured_output":{"name":"test-oauth","description":"oauth test","entities":[],"relationships":[]}}
1386 ]"#;
1387 let (_result, cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1388 assert!(is_oauth, "apiKeySource=none must be detected as OAuth");
1389 assert!((cost - 0.50).abs() < f64::EPSILON);
1390 }
1391
1392 #[test]
1393 fn test_api_key_source_not_oauth() {
1394 let output = r#"[
1395 {"type":"system","subtype":"init","apiKeySource":"env"},
1396 {"type":"result","is_error":false,"total_cost_usd":0.10,"structured_output":{"name":"test-api","description":"api test","entities":[],"relationships":[]}}
1397 ]"#;
1398 let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1399 assert!(!is_oauth, "apiKeySource=env must NOT be detected as OAuth");
1400 }
1401
1402 #[test]
1403 fn test_missing_api_key_source_defaults_not_oauth() {
1404 let output = r#"[
1405 {"type":"system","subtype":"init"},
1406 {"type":"result","is_error":false,"total_cost_usd":0.05,"structured_output":{"name":"test-missing","description":"missing test","entities":[],"relationships":[]}}
1407 ]"#;
1408 let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1409 assert!(!is_oauth, "missing apiKeySource must default to not OAuth");
1410 }
1411
1412 #[test]
1413 fn test_extraction_schema_entity_types_match_enum() {
1414 let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1415 let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1416 .as_array()
1417 .expect("schema must have entity_type enum");
1418 for t in types {
1419 let s = t.as_str().unwrap();
1420 assert!(
1421 s.parse::<EntityType>().is_ok(),
1422 "schema entity_type '{s}' not in EntityType enum"
1423 );
1424 }
1425 }
1426}