1use crate::commands::ingest::IngestArgs;
13use crate::entity_type::EntityType;
14use crate::errors::AppError;
15use crate::paths::AppPaths;
16use crate::spawn::env_whitelist::apply_env_whitelist;
17use crate::storage::connection::{ensure_db_ready, open_rw};
18use crate::storage::entities::{self, NewEntity, NewRelationship};
19use crate::storage::memories::{self, NewMemory};
20
21use rusqlite::Connection;
22use serde::{Deserialize, Serialize};
23use std::io::Write;
24use std::path::{Path, PathBuf};
25use std::process::{Command, Stdio};
26use std::time::Instant;
27
28const MIN_CLAUDE_VERSION: &str = "2.1.0";
29
30const EXTRACTION_SCHEMA: &str = r#"{
31 "type": "object",
32 "properties": {
33 "name": { "type": "string" },
34 "description": { "type": "string" },
35 "entities": {
36 "type": "array",
37 "items": {
38 "type": "object",
39 "properties": {
40 "name": { "type": "string" },
41 "entity_type": {
42 "type": "string",
43 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
44 }
45 },
46 "required": ["name", "entity_type"],
47 "additionalProperties": false
48 }
49 },
50 "relationships": {
51 "type": "array",
52 "items": {
53 "type": "object",
54 "properties": {
55 "source": { "type": "string" },
56 "target": { "type": "string" },
57 "relation": {
58 "type": "string",
59 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
60 },
61 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
62 },
63 "required": ["source","target","relation","strength"],
64 "additionalProperties": false
65 }
66 }
67 },
68 "required": ["name","description","entities","relationships"],
69 "additionalProperties": false
70}"#;
71
72const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
731. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
742. A one-sentence description (10-20 words) summarizing the key insight\n\
753. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
764. Typed relationships between entities with strength scores\n\n\
77Rules:\n\
78- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
79- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
80- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
81- NEVER use 'mentions' as relationship type\n\
82- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
83- Prefer fewer high-quality entities over many low-quality ones\n\
84- Description must answer: What is this about and WHY does it matter?";
85
86#[derive(Debug, Deserialize)]
87struct ClaudeOutputElement {
88 r#type: Option<String>,
89 subtype: Option<String>,
90 #[serde(default)]
91 is_error: bool,
92 structured_output: Option<ExtractionResult>,
93 result: Option<String>,
94 total_cost_usd: Option<f64>,
95 error: Option<String>,
96 terminal_reason: Option<String>,
97 #[serde(rename = "apiKeySource")]
98 api_key_source: Option<String>,
99}
100
101#[derive(Debug, Clone, Deserialize, Serialize)]
102pub struct ExtractionResult {
103 pub name: String,
104 pub description: String,
105 pub entities: Vec<ExtractedEntity>,
106 pub relationships: Vec<ExtractedRelationship>,
107}
108
109#[derive(Debug, Clone, Deserialize, Serialize)]
110pub struct ExtractedEntity {
111 pub name: String,
112 pub entity_type: String,
113}
114
115#[derive(Debug, Clone, Deserialize, Serialize)]
116pub struct ExtractedRelationship {
117 pub source: String,
118 pub target: String,
119 pub relation: String,
120 pub strength: f64,
121}
122
123#[derive(Debug, Serialize)]
124struct PhaseEvent<'a> {
125 phase: &'a str,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 claude_path: Option<&'a str>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 version: Option<&'a str>,
130 #[serde(skip_serializing_if = "Option::is_none")]
131 dir: Option<&'a str>,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 files_total: Option<usize>,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 files_new: Option<usize>,
136 #[serde(skip_serializing_if = "Option::is_none")]
137 files_existing: Option<usize>,
138}
139
140#[derive(Debug, Serialize)]
141struct FileEvent<'a> {
142 file: &'a str,
143 name: &'a str,
144 status: &'a str,
145 #[serde(skip_serializing_if = "Option::is_none")]
146 memory_id: Option<i64>,
147 #[serde(skip_serializing_if = "Option::is_none")]
148 entities: Option<usize>,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 rels: Option<usize>,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 cost_usd: Option<f64>,
153 #[serde(skip_serializing_if = "Option::is_none")]
154 elapsed_ms: Option<u64>,
155 #[serde(skip_serializing_if = "Option::is_none")]
156 error: Option<&'a str>,
157 index: usize,
158 total: usize,
159}
160
161#[derive(Debug, Serialize)]
162struct Summary {
163 summary: bool,
164 files_total: usize,
165 completed: usize,
166 failed: usize,
167 skipped: usize,
168 entities_total: usize,
169 rels_total: usize,
170 cost_usd: f64,
171 elapsed_ms: u64,
172}
173
174pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
176 if let Some(p) = explicit {
177 if p.exists() {
178 return Ok(p.to_path_buf());
179 }
180 return Err(AppError::Validation(format!(
181 "Claude Code binary not found at explicit path: {}",
182 p.display()
183 )));
184 }
185
186 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
187 let p = PathBuf::from(&env_path);
188 if p.exists() {
189 return Ok(p);
190 }
191 }
192
193 let name = if cfg!(windows) {
194 "claude.exe"
195 } else {
196 "claude"
197 };
198 if let Some(path_var) = std::env::var_os("PATH") {
199 for dir in std::env::split_paths(&path_var) {
200 let candidate = dir.join(name);
201 if candidate.exists() {
202 return Ok(candidate);
203 }
204 }
205 }
206
207 Err(AppError::Validation(
208 "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
209 ))
210}
211
212fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
214 let output = Command::new(binary)
215 .arg("--version")
216 .stdin(Stdio::null())
217 .stdout(Stdio::piped())
218 .stderr(Stdio::piped())
219 .output()
220 .map_err(AppError::Io)?;
221
222 if !output.status.success() {
223 return Err(AppError::Validation(
224 "failed to run 'claude --version'".to_string(),
225 ));
226 }
227
228 let version_str = String::from_utf8(output.stdout)
229 .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
230 let version = version_str.trim().to_string();
231
232 let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
234
235 fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
236 let parts: Vec<&str> = s.splitn(3, '.').collect();
237 if parts.len() < 2 {
238 return None;
239 }
240 let major = parts[0].parse::<u64>().ok()?;
241 let minor = parts[1].parse::<u64>().ok()?;
242 let patch = parts
243 .get(2)
244 .and_then(|p| p.parse::<u64>().ok())
245 .unwrap_or(0);
246 Some((major, minor, patch))
247 }
248
249 if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
250 if actual < min {
251 return Err(AppError::Validation(format!(
252 "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
253 )));
254 }
255 }
256
257 Ok(version)
258}
259
260fn extract_with_claude(
273 binary: &Path,
274 file_content: &[u8],
275 model: Option<&str>,
276 timeout_secs: u64,
277) -> Result<(ExtractionResult, f64, bool), AppError> {
278 use wait_timeout::ChildExt;
279
280 if let Ok(_key) = std::env::var("ANTHROPIC_API_KEY") {
284 let mut cmd = Command::new("false");
285 cmd.env_clear();
286 cmd.env("PATH", "/nonexistent");
287 cmd.arg("--oauth-only-violation-anthropic-api-key-set");
288 return Err(AppError::Validation(
289 "ANTHROPIC_API_KEY is set in the environment; \
290 sqlite-graphrag operates exclusively with OAuth (Pro/Max) and \
291 the API-key path is PROHIBITED (gaps.md:47). Unset the variable \
292 and re-run with `claude login` already completed in this session."
293 .to_string(),
294 ));
295 }
296
297 let mut cmd = Command::new(binary);
298
299 apply_env_whitelist(&mut cmd, crate::spawn::env_whitelist::is_strict_env_clear());
302 crate::spawn::apply_cwd_isolation(&mut cmd)?;
303
304 let mcp_config_path = crate::spawn::preflight::write_empty_mcp_config_tempfile()?;
311
312 cmd.arg("-p")
313 .arg(EXTRACTION_PROMPT)
314 .arg("--strict-mcp-config")
315 .arg("--mcp-config")
316 .arg(mcp_config_path.as_os_str())
317 .arg("--dangerously-skip-permissions")
318 .arg("--settings")
319 .arg(r#"{"hooks":{}}"#)
320 .arg("--output-format")
321 .arg("json")
322 .arg("--json-schema")
323 .arg(EXTRACTION_SCHEMA)
324 .arg("--max-turns")
325 .arg("7")
326 .arg("--no-session-persistence");
327
328 if let Some(m) = model {
329 cmd.arg("--model").arg(m);
330 }
331
332 cmd.stdin(Stdio::piped())
333 .stdout(Stdio::piped())
334 .stderr(Stdio::piped());
335
336 let argv_refs: Vec<std::ffi::OsString> = cmd.get_args().map(|s| s.to_os_string()).collect();
340 let preflight_args = crate::spawn::preflight::PreFlightArgs {
341 binary_path: binary,
342 argv: &argv_refs,
343 workspace_root: std::path::Path::new("."),
344 mcp_config_inline_json: None,
345 expected_output_bytes: 65_536,
346 spawner_name: "ingest_claude",
347 };
348 if let Err(e) = crate::spawn::preflight::preflight_check(&preflight_args) {
349 return Err(crate::errors::AppError::from(e));
355 }
356
357 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
358 AppError::Io(std::io::Error::new(
359 e.kind(),
360 format!("failed to spawn claude: {e}"),
361 ))
362 })?;
363
364 let stdin_data = file_content.to_vec();
365 let mut child_stdin = child
366 .stdin
367 .take()
368 .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
369 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
370 child_stdin.write_all(&stdin_data)?;
371 drop(child_stdin);
372 Ok(())
373 });
374
375 let start = std::time::Instant::now();
376 let timeout = std::time::Duration::from_secs(timeout_secs);
377 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
378
379 match status {
380 Some(exit_status) => {
381 stdin_thread
382 .join()
383 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
384 .map_err(AppError::Io)?;
385
386 tracing::debug!(
387 target: "process",
388 exit_code = ?exit_status.code(),
389 elapsed_ms = start.elapsed().as_millis() as u64,
390 "external process completed"
391 );
392
393 let mut stdout_buf = Vec::new();
394 let mut stderr_buf = Vec::new();
395 if let Some(mut out) = child.stdout.take() {
396 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
397 }
398 if let Some(mut err) = child.stderr.take() {
399 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
400 }
401
402 if !exit_status.success() {
403 let stdout_str = String::from_utf8_lossy(&stdout_buf);
404 if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
405 {
406 if let Some(re) = elements
407 .iter()
408 .find(|e| e.r#type.as_deref() == Some("result"))
409 {
410 if re.terminal_reason.as_deref() == Some("max_turns") {
411 tracing::warn!(
412 target: "ingest",
413 "extraction hit max_turns limit — hooks may have consumed turns"
414 );
415 return Err(AppError::Validation(
416 "claude -p hit max_turns: hooks may be consuming turns".into(),
417 ));
418 }
419 if re.is_error {
420 let err_msg = re
421 .error
422 .as_deref()
423 .or(re.result.as_deref())
424 .unwrap_or("unknown error");
425 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
426 return Err(AppError::RateLimited {
427 detail: err_msg.to_string(),
428 });
429 }
430 if err_msg.contains("Not logged in")
431 || err_msg.contains("authentication")
432 {
433 tracing::warn!(
434 target: "ingest",
435 "Claude Code authentication failed. Re-authenticate interactively with: claude"
436 );
437 }
438 return Err(AppError::Validation(format!(
439 "claude -p failed: {err_msg}"
440 )));
441 }
442 }
443 }
444 let stderr_str = String::from_utf8_lossy(&stderr_buf);
445 if stderr_str.contains("auth") || stderr_str.contains("login") {
446 tracing::warn!(
447 target: "ingest",
448 "Claude Code authentication may have failed. Re-authenticate with: claude"
449 );
450 }
451 return Err(AppError::Validation(format!(
452 "claude -p exited with code {:?}: {}",
453 exit_status.code(),
454 stderr_str.trim()
455 )));
456 }
457
458 let stdout = String::from_utf8(stdout_buf)
459 .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
460 parse_claude_output(&stdout)
461 }
462 None => {
463 tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
464 let _ = child.kill();
465 let _ = child.wait();
466 let _ = stdin_thread.join();
467 Err(AppError::Validation(format!(
468 "claude -p timed out after {timeout_secs} seconds"
469 )))
470 }
471 }
472}
473
474fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64, bool), AppError> {
479 let elements: Vec<ClaudeOutputElement> = serde_json::from_str(stdout).map_err(|e| {
480 AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
481 })?;
482
483 let is_oauth = elements
484 .iter()
485 .find(|e| e.r#type.as_deref() == Some("system") && e.subtype.as_deref() == Some("init"))
486 .and_then(|e| e.api_key_source.as_deref())
487 .map(|s| s == "none")
488 .unwrap_or(false);
489
490 let result_elem = elements
491 .iter()
492 .find(|e| e.r#type.as_deref() == Some("result"))
493 .ok_or_else(|| {
494 AppError::Validation("claude output missing 'result' element".to_string())
495 })?;
496
497 if result_elem.is_error {
498 let err_msg = result_elem
499 .error
500 .as_deref()
501 .or(result_elem.result.as_deref())
502 .unwrap_or("unknown error");
503 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
504 return Err(AppError::RateLimited {
505 detail: err_msg.to_string(),
506 });
507 }
508 return Err(AppError::Validation(format!(
509 "claude extraction failed: {err_msg}"
510 )));
511 }
512
513 let extraction = result_elem
514 .structured_output
515 .clone()
516 .or_else(|| {
517 result_elem
518 .result
519 .as_ref()
520 .and_then(|text| serde_json::from_str::<ExtractionResult>(text).ok())
521 })
522 .ok_or_else(|| {
523 AppError::Validation("claude result missing structured_output and result field".into())
524 })?;
525
526 let cost = result_elem.total_cost_usd.unwrap_or(0.0);
527
528 Ok((extraction, cost, is_oauth))
529}
530
531use crate::output::emit_json_line as emit_json;
532
533fn collect_matching_files(
535 dir: &Path,
536 pattern: &str,
537 recursive: bool,
538 max_files: usize,
539) -> Result<Vec<PathBuf>, AppError> {
540 let mut files = Vec::new();
541 super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
542 files.sort_unstable();
543
544 if files.len() > max_files {
545 return Err(AppError::Validation(format!(
546 "found {} files, exceeds --max-files cap of {}",
547 files.len(),
548 max_files
549 )));
550 }
551
552 Ok(files)
553}
554
555fn open_queue_db(path: &str) -> Result<Connection, AppError> {
557 let conn = Connection::open(path)?;
558
559 conn.pragma_update(None, "journal_mode", "wal")?;
560
561 conn.execute_batch(
562 "CREATE TABLE IF NOT EXISTS queue (
563 id INTEGER PRIMARY KEY AUTOINCREMENT,
564 file_path TEXT NOT NULL UNIQUE,
565 name TEXT,
566 status TEXT NOT NULL DEFAULT 'pending',
567 memory_id INTEGER,
568 entities INTEGER DEFAULT 0,
569 rels INTEGER DEFAULT 0,
570 error TEXT,
571 cost_usd REAL DEFAULT 0.0,
572 attempt INTEGER DEFAULT 0,
573 elapsed_ms INTEGER,
574 created_at TEXT DEFAULT (datetime('now')),
575 done_at TEXT
576 );
577 CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
578 )?;
579
580 Ok(conn)
581}
582
583pub fn run_claude_ingest(
585 args: &IngestArgs,
586 embedding_backend: crate::cli::EmbeddingBackendChoice,
587 llm_backend: crate::cli::LlmBackendChoice,
588) -> Result<(), AppError> {
589 let started = Instant::now();
590
591 if !args.dir.exists() {
592 return Err(AppError::Validation(format!(
593 "directory not found: {}",
594 args.dir.display()
595 )));
596 }
597
598 let early_ns = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
603 let early_paths = AppPaths::resolve(args.db.as_deref())?;
604 let _singleton = crate::lock::acquire_job_singleton(
605 crate::lock::JobType::IngestClaudeCode,
606 &early_ns,
607 &early_paths.db,
608 args.wait_job_singleton,
609 args.force_job_singleton,
610 )?;
611
612 let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
614 let version = validate_claude_version(&claude_binary)?;
615 tracing::info!(
616 target: "ingest",
617 binary = %claude_binary.display(),
618 version = %version,
619 "Claude Code binary validated"
620 );
621
622 emit_json(&PhaseEvent {
623 phase: "validate",
624 claude_path: claude_binary.to_str(),
625 version: Some(&version),
626 dir: None,
627 files_total: None,
628 files_new: None,
629 files_existing: None,
630 });
631
632 let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
634
635 let queue_conn = open_queue_db(&args.queue_db)?;
636
637 if args.resume {
638 let reset = queue_conn
639 .execute(
640 "UPDATE queue SET status='pending' WHERE status='processing'",
641 [],
642 )
643 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
644 if reset > 0 {
645 tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
646 }
647 }
648
649 if args.retry_failed {
650 let count = queue_conn
651 .execute(
652 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
653 [],
654 )
655 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
656 tracing::info!(target: "ingest", count, "retrying failed files");
657 }
658
659 if !args.resume && !args.retry_failed {
660 queue_conn
661 .execute("DELETE FROM queue", [])
662 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
663 }
664
665 let mut new_count = 0usize;
666 let mut existing_count = 0usize;
667
668 if !args.retry_failed {
669 for file in &files {
670 let file_str = file.to_string_lossy().into_owned();
671 let inserted = queue_conn
672 .execute(
673 "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
674 rusqlite::params![file_str],
675 )
676 .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
677 if inserted > 0 {
678 new_count += 1;
679 } else {
680 existing_count += 1;
681 }
682 }
683 }
684
685 emit_json(&PhaseEvent {
686 phase: "scan",
687 claude_path: None,
688 version: None,
689 dir: args.dir.to_str(),
690 files_total: Some(files.len()),
691 files_new: Some(new_count),
692 files_existing: Some(existing_count),
693 });
694
695 if args.dry_run {
696 for (idx, file) in files.iter().enumerate() {
697 let (name, _truncated, _orig) =
698 super::ingest::derive_kebab_name(file, args.max_name_length);
699 emit_json(&FileEvent {
700 file: &file.to_string_lossy(),
701 name: &name,
702 status: "preview",
703 memory_id: None,
704 entities: None,
705 rels: None,
706 cost_usd: None,
707 elapsed_ms: None,
708 error: None,
709 index: idx,
710 total: files.len(),
711 });
712 }
713 emit_json(&Summary {
714 summary: true,
715 files_total: files.len(),
716 completed: 0,
717 failed: 0,
718 skipped: 0,
719 entities_total: 0,
720 rels_total: 0,
721 cost_usd: 0.0,
722 elapsed_ms: started.elapsed().as_millis() as u64,
723 });
724 if !args.keep_queue {
725 let _ = std::fs::remove_file(&args.queue_db);
726 }
727 return Ok(());
728 }
729
730 let paths = AppPaths::resolve(args.db.as_deref())?;
732 ensure_db_ready(&paths)?;
733 let conn = open_rw(&paths.db)?;
734
735 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
736 let memory_type_str = args.r#type.as_str().to_string();
737
738 let mut completed = 0usize;
739 let mut failed = 0usize;
740 let skipped_initial: usize = queue_conn
741 .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
742 r.get::<_, usize>(0)
743 })
744 .unwrap_or(0);
745 let mut skipped = skipped_initial;
746 let mut entities_total = 0usize;
747 let mut rels_total = 0usize;
748 let mut cost_total = 0.0f64;
749 let mut oauth_detected = false;
750 let total = files.len();
751
752 let mut backoff_secs = args.rate_limit_wait;
753 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
754
755 loop {
756 if crate::shutdown_requested() {
757 tracing::info!(target: "ingest", "shutdown requested, stopping before next file");
758 break;
759 }
760
761 let pending: Option<(i64, String)> = queue_conn
762 .query_row(
763 "UPDATE queue SET status='processing', attempt=attempt+1 \
764 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
765 RETURNING id, file_path",
766 [],
767 |row| Ok((row.get(0)?, row.get(1)?)),
768 )
769 .ok();
770
771 let (queue_id, file_path) = match pending {
772 Some(p) => p,
773 None => break,
774 };
775
776 let file_started = Instant::now();
777
778 const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
780 if let Ok(meta) = std::fs::metadata(&file_path) {
781 if meta.len() > MAX_FILE_SIZE {
782 let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
783 let _ = queue_conn.execute(
784 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
785 rusqlite::params![err_msg, queue_id],
786 );
787 let current_index = completed + failed + skipped;
788 failed += 1;
789 emit_json(&FileEvent {
790 file: &file_path,
791 name: "",
792 status: "failed",
793 memory_id: None,
794 entities: None,
795 rels: None,
796 cost_usd: None,
797 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
798 error: Some(&err_msg),
799 index: current_index,
800 total,
801 });
802 if args.fail_fast {
803 break;
804 }
805 continue;
806 }
807 }
808
809 let file_content = match std::fs::read(&file_path) {
810 Ok(c) => c,
811 Err(e) => {
812 let err_msg = format!("IO error: {e}");
813 let _ = queue_conn.execute(
814 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
815 rusqlite::params![err_msg, queue_id],
816 );
817 let current_index = completed + failed + skipped;
818 failed += 1;
819 emit_json(&FileEvent {
820 file: &file_path,
821 name: "",
822 status: "failed",
823 memory_id: None,
824 entities: None,
825 rels: None,
826 cost_usd: None,
827 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
828 error: Some(&err_msg),
829 index: current_index,
830 total,
831 });
832 if args.fail_fast {
833 break;
834 }
835 continue;
836 }
837 };
838
839 if file_content.len() > crate::constants::MAX_MEMORY_BODY_LEN {
841 let err_msg = format!(
842 "file body exceeds {} byte limit ({} bytes) — skipping to avoid wasting LLM tokens",
843 crate::constants::MAX_MEMORY_BODY_LEN,
844 file_content.len()
845 );
846 tracing::warn!(target: "ingest", file = %file_path, size = file_content.len(), "body exceeds limit, skipping LLM extraction");
847 let _ = queue_conn.execute(
848 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
849 rusqlite::params![err_msg, queue_id],
850 );
851 let current_index = completed + failed + skipped;
852 skipped += 1;
853 emit_json(&FileEvent {
854 file: &file_path,
855 name: "",
856 status: "skipped",
857 memory_id: None,
858 entities: None,
859 rels: None,
860 cost_usd: None,
861 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
862 error: Some(&err_msg),
863 index: current_index,
864 total,
865 });
866 continue;
867 }
868
869 let max_extract_attempts: u32 = 2;
871 let mut extraction_result: Option<(ExtractionResult, f64, bool)> = None;
872 let mut last_extract_err: Option<String> = None;
873 let mut last_was_rate_limited = false;
874
875 for attempt in 1..=max_extract_attempts {
876 match extract_with_claude(
877 &claude_binary,
878 &file_content,
879 args.claude_model.as_deref(),
880 args.claude_timeout,
881 ) {
882 Ok(result) => {
883 extraction_result = Some(result);
884 break;
885 }
886 Err(ref e) if matches!(e, AppError::RateLimited { .. }) => {
887 last_extract_err = Some(format!("{e}"));
888 last_was_rate_limited = true;
889 break;
890 }
891 Err(e) => {
892 let msg = format!("{e}");
893 if attempt < max_extract_attempts {
894 let cold_start_delay = 2 * attempt as u64;
895 tracing::warn!(target: "ingest", attempt, delay_secs = cold_start_delay, error = %msg, "extraction failed, retrying (cold-start workaround)");
896 std::thread::sleep(std::time::Duration::from_secs(cold_start_delay));
897 }
898 last_extract_err = Some(msg);
899 }
900 }
901 }
902
903 if let Some((extraction, cost, is_oauth)) = extraction_result {
904 if is_oauth && !oauth_detected {
905 oauth_detected = true;
906 tracing::info!(target: "ingest", "OAuth subscription detected — cost_usd omitted from output");
907 }
908 backoff_secs = args.rate_limit_wait;
909
910 let (normalized_name, _truncated, _orig) = crate::commands::ingest::derive_kebab_name(
911 std::path::Path::new(&extraction.name),
912 args.max_name_length,
913 );
914 let name = &normalized_name;
915 let ent_count = extraction.entities.len();
916 let rel_count = 0;
917
918 let new_entities: Vec<NewEntity> = extraction
919 .entities
920 .iter()
921 .filter_map(|e| match e.entity_type.parse::<EntityType>() {
922 Ok(et) => Some(NewEntity {
923 name: e.name.clone(),
924 entity_type: et,
925 description: None,
926 }),
927 Err(_) => {
928 tracing::warn!(
929 target: "ingest",
930 entity = %e.name,
931 entity_type = %e.entity_type,
932 "entity type not recognized, skipping"
933 );
934 None
935 }
936 })
937 .collect();
938
939 let new_relationships: Vec<NewRelationship> = extraction
940 .relationships
941 .iter()
942 .map(|r| NewRelationship {
943 source: r.source.clone(),
944 target: r.target.clone(),
945 relation: crate::parsers::normalize_relation(&r.relation),
946 strength: r.strength,
947 description: None,
948 })
949 .collect();
950
951 let body_str = String::from_utf8(file_content.clone())
952 .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
953 let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
954 let new_memory = NewMemory {
955 name: name.clone(),
956 namespace: namespace.clone(),
957 memory_type: memory_type_str.clone(),
958 description: extraction.description.clone(),
959 body: body_str.to_string(),
960 body_hash,
961 session_id: None,
962 source: "agent".to_string(),
963 metadata: serde_json::Value::Object(serde_json::Map::new()),
964 };
965
966 let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
968 Some((existing_id, is_deleted)) => {
969 if is_deleted {
970 memories::clear_deleted_at(&conn, existing_id)?;
971 }
972 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
973 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
974 rusqlite::params![existing_id],
975 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
976 )?;
977 memories::update(&conn, existing_id, &new_memory, None)?;
978 memories::sync_fts_after_update(
979 &conn,
980 existing_id,
981 &old_name,
982 &old_desc,
983 &old_body,
984 &new_memory.name,
985 &new_memory.description,
986 &new_memory.body,
987 )?;
988 tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
989 existing_id
990 }
991 None => match memories::insert(&conn, &new_memory) {
992 Ok(id) => id,
993 Err(e) => {
994 let err_msg = format!("{e}");
995 let _ = queue_conn.execute(
996 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
997 rusqlite::params![err_msg, queue_id],
998 );
999 let current_index = completed + failed + skipped;
1000 failed += 1;
1001 emit_json(&FileEvent {
1002 file: &file_path,
1003 name,
1004 status: "failed",
1005 memory_id: None,
1006 entities: None,
1007 rels: None,
1008 cost_usd: if is_oauth { None } else { Some(cost) },
1009 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1010 error: Some(&err_msg),
1011 index: current_index,
1012 total,
1013 });
1014 if !is_oauth {
1015 cost_total += cost;
1016 }
1017 if args.fail_fast {
1018 break;
1019 }
1020 continue;
1021 }
1022 },
1023 };
1024
1025 for ent in &new_entities {
1026 match entities::upsert_entity(&conn, &namespace, ent) {
1027 Ok(eid) => {
1028 let _ = entities::link_memory_entity(&conn, memory_id, eid);
1029 }
1030 Err(e) => {
1031 tracing::warn!(
1032 target: "ingest",
1033 entity = %ent.name,
1034 error = %e,
1035 "entity skipped due to validation error"
1036 );
1037 }
1038 }
1039 }
1040 for rel in &new_relationships {
1041 crate::parsers::warn_if_non_canonical(&rel.relation);
1042 let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
1043 let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
1044 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1045 let _ = conn.execute(
1046 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
1047 rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
1048 );
1049 }
1050 }
1051
1052 let body_text = String::from_utf8(file_content.clone())
1054 .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
1055 let snippet: String = body_text.chars().take(200).collect();
1056 let chunks_info = crate::chunking::split_into_chunks_hierarchical(&body_text);
1057
1058 let embedding_result = if chunks_info.len() <= 1 {
1060 crate::embedder::embed_passage_with_embedding_choice(
1061 &paths.models,
1062 &body_text,
1063 embedding_backend,
1064 llm_backend,
1065 )
1066 .map(|(v, _)| v)
1067 } else {
1068 let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
1069 let mut multi_ok = true;
1070 for chunk in &chunks_info {
1071 let chunk_text = crate::chunking::chunk_text(&body_text, chunk);
1072 match crate::embedder::embed_passage_with_embedding_choice(
1073 &paths.models,
1074 chunk_text,
1075 embedding_backend,
1076 llm_backend,
1077 )
1078 .map(|(v, _)| v)
1079 {
1080 Ok(emb) => chunk_embeddings.push(emb),
1081 Err(e) => {
1082 tracing::warn!(
1083 target: "ingest",
1084 file = %file_path,
1085 error = %e,
1086 "chunk embedding failed, skipping vector index for this file"
1087 );
1088 multi_ok = false;
1089 break;
1090 }
1091 }
1092 }
1093 if multi_ok {
1094 let aggregated = crate::chunking::aggregate_embeddings(&chunk_embeddings);
1095 if let Err(e) = crate::storage::chunks::insert_chunk_slices(
1097 &conn,
1098 memory_id,
1099 &body_text,
1100 &chunks_info,
1101 ) {
1102 tracing::warn!(
1103 target: "ingest",
1104 file = %file_path,
1105 error = %e,
1106 "chunk slice insert failed"
1107 );
1108 } else {
1109 for (i, emb) in chunk_embeddings.iter().enumerate() {
1110 if let Err(e) = crate::storage::chunks::upsert_chunk_vec(
1111 &conn, i as i64, memory_id, i as i32, emb,
1112 ) {
1113 tracing::warn!(
1114 target: "ingest",
1115 file = %file_path,
1116 chunk = i,
1117 error = %e,
1118 "chunk vec upsert failed"
1119 );
1120 }
1121 }
1122 }
1123 Ok(aggregated)
1124 } else {
1125 crate::embedder::embed_passage_with_embedding_choice(
1127 &paths.models,
1128 &body_text,
1129 embedding_backend,
1130 llm_backend,
1131 )
1132 .map(|(v, _)| v)
1133 }
1134 };
1135
1136 match embedding_result {
1137 Ok(embedding) => {
1138 if let Err(e) = memories::upsert_vec(
1139 &conn,
1140 memory_id,
1141 &namespace,
1142 &memory_type_str,
1143 &embedding,
1144 name,
1145 &snippet,
1146 ) {
1147 tracing::warn!(
1148 target: "ingest",
1149 file = %file_path,
1150 error = %e,
1151 "memory vec upsert failed; recall may not find this memory"
1152 );
1153 }
1154 for ent in &new_entities {
1156 if let Ok(Some(eid)) =
1157 entities::find_entity_id(&conn, &namespace, &ent.name)
1158 {
1159 let entity_text = ent.name.clone();
1160 match crate::embedder::embed_passage_with_embedding_choice(
1161 &paths.models,
1162 &entity_text,
1163 embedding_backend,
1164 llm_backend,
1165 )
1166 .map(|(v, _)| v)
1167 {
1168 Ok(emb) => {
1169 if let Err(e) = entities::upsert_entity_vec(
1170 &conn,
1171 eid,
1172 &namespace,
1173 ent.entity_type,
1174 &emb,
1175 &ent.name,
1176 ) {
1177 tracing::warn!(
1178 target: "ingest",
1179 entity = %ent.name,
1180 error = %e,
1181 "entity vec upsert failed"
1182 );
1183 }
1184 }
1185 Err(e) => {
1186 tracing::warn!(
1187 target: "ingest",
1188 entity = %ent.name,
1189 error = %e,
1190 "entity embedding failed"
1191 );
1192 }
1193 }
1194 }
1195 }
1196 }
1197 Err(e) => {
1198 tracing::warn!(
1199 target: "ingest",
1200 file = %file_path,
1201 error = %e,
1202 "memory embedding failed; recall will not find this memory"
1203 );
1204 }
1205 }
1206
1207 let _ = queue_conn.execute(
1208 "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1209 rusqlite::params![
1210 name,
1211 memory_id,
1212 ent_count,
1213 rel_count,
1214 cost,
1215 file_started.elapsed().as_millis() as i64,
1216 queue_id
1217 ],
1218 );
1219
1220 let current_index = completed + failed + skipped;
1221 completed += 1;
1222 entities_total += ent_count;
1223 rels_total += rel_count;
1224 if !is_oauth {
1225 cost_total += cost;
1226 }
1227
1228 emit_json(&FileEvent {
1229 file: &file_path,
1230 name,
1231 status: "done",
1232 memory_id: Some(memory_id),
1233 entities: Some(ent_count),
1234 rels: Some(rel_count),
1235 cost_usd: if is_oauth { None } else { Some(cost) },
1236 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1237 error: None,
1238 index: current_index,
1239 total,
1240 });
1241 } else if let Some(ref err_str) = last_extract_err {
1242 if last_was_rate_limited {
1243 if crate::retry::is_kill_switch_active() {
1244 tracing::warn!(target: "ingest", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1245 } else if std::time::Instant::now() >= rate_limit_deadline {
1246 tracing::error!(target: "ingest", "rate-limit retry deadline (1h) exhausted");
1247 } else {
1248 let half = backoff_secs / 2;
1249 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1250 let actual_wait = half + jitter;
1251 tracing::warn!(target: "ingest", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
1252 let _ = queue_conn.execute(
1253 "UPDATE queue SET status='pending' WHERE id=?1",
1254 rusqlite::params![queue_id],
1255 );
1256 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1257 backoff_secs = (backoff_secs * 2).min(900);
1258 continue;
1259 }
1260 } else {
1261 let _ = queue_conn.execute(
1262 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1263 rusqlite::params![err_str, queue_id],
1264 );
1265 let current_index = completed + failed + skipped;
1266 failed += 1;
1267 emit_json(&FileEvent {
1268 file: &file_path,
1269 name: "",
1270 status: "failed",
1271 memory_id: None,
1272 entities: None,
1273 rels: None,
1274 cost_usd: None,
1275 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1276 error: Some(err_str),
1277 index: current_index,
1278 total,
1279 });
1280 if args.fail_fast {
1281 break;
1282 }
1283 }
1284 }
1285
1286 if let Some(budget) = args.max_cost_usd {
1287 if oauth_detected {
1288 tracing::debug!(target: "ingest", "--max-cost-usd ignored: OAuth subscription detected");
1289 } else if cost_total >= budget {
1290 tracing::warn!(
1291 target: "ingest",
1292 spent = cost_total,
1293 budget = budget,
1294 "budget exceeded, stopping"
1295 );
1296 break;
1297 }
1298 }
1299 }
1300
1301 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1303 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1304
1305 emit_json(&Summary {
1306 summary: true,
1307 files_total: total,
1308 completed,
1309 failed,
1310 skipped,
1311 entities_total,
1312 rels_total,
1313 cost_usd: cost_total,
1314 elapsed_ms: started.elapsed().as_millis() as u64,
1315 });
1316
1317 if !args.keep_queue && failed == 0 {
1318 let _ = std::fs::remove_file(&args.queue_db);
1319 }
1320
1321 Ok(())
1322}
1323
1324#[cfg(test)]
1325mod tests {
1326 use super::*;
1327
1328 #[test]
1329 fn test_extraction_schema_valid_json() {
1330 let _: serde_json::Value =
1331 serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
1332 }
1333
1334 #[test]
1335 fn test_parse_claude_output_valid() {
1336 let output = r#"[
1337 {"type":"system","subtype":"init"},
1338 {"type":"assistant"},
1339 {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
1340 ]"#;
1341 let (result, cost, _is_oauth) = parse_claude_output(output).expect("parse must succeed");
1342 assert_eq!(result.name, "test-doc");
1343 assert_eq!(result.entities.len(), 1);
1344 assert_eq!(result.relationships.len(), 1);
1345 assert!((cost - 0.02).abs() < f64::EPSILON);
1346 }
1347
1348 #[test]
1349 fn test_parse_claude_output_error() {
1350 let output = r#"[
1351 {"type":"system","subtype":"init"},
1352 {"type":"result","is_error":true,"error":"authentication failed"}
1353 ]"#;
1354 let err = parse_claude_output(output).unwrap_err();
1355 assert!(format!("{err}").contains("authentication failed"));
1356 }
1357
1358 #[test]
1359 fn test_parse_claude_output_rate_limit() {
1360 let output = r#"[
1361 {"type":"system","subtype":"init"},
1362 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
1363 ]"#;
1364 let err = parse_claude_output(output).unwrap_err();
1365 assert!(matches!(err, AppError::RateLimited { .. }));
1366 }
1367
1368 #[test]
1369 fn test_parse_claude_output_malformed() {
1370 let output = "not json at all";
1371 assert!(parse_claude_output(output).is_err());
1372 }
1373
1374 #[test]
1375 fn test_find_claude_binary_not_found() {
1376 let original_path = std::env::var_os("PATH");
1377 std::env::set_var("PATH", "/nonexistent");
1378 std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
1379 let result = find_claude_binary(None);
1380 if let Some(p) = original_path {
1381 std::env::set_var("PATH", p);
1382 }
1383 assert!(result.is_err());
1384 }
1385
1386 #[test]
1387 fn test_parse_claude_output_result_fallback() {
1388 let output = r#"[
1389 {"type":"system","subtype":"init"},
1390 {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1391 ]"#;
1392 let (result, cost, _is_oauth) =
1393 parse_claude_output(output).expect("result fallback must work");
1394 assert_eq!(result.name, "test-fallback");
1395 assert_eq!(result.entities.len(), 1);
1396 assert!(result.relationships.is_empty());
1397 assert!((cost - 0.01).abs() < f64::EPSILON);
1398 }
1399
1400 #[test]
1401 fn test_parse_claude_output_error_with_result_field() {
1402 let output = r#"[
1403 {"type":"system","subtype":"init"},
1404 {"type":"result","is_error":true,"result":"Not logged in · Please run /login"}
1405 ]"#;
1406 let err = parse_claude_output(output).unwrap_err();
1407 let msg = format!("{err}");
1408 assert!(
1409 msg.contains("Not logged in"),
1410 "expected 'Not logged in' in: {msg}"
1411 );
1412 }
1413
1414 #[test]
1415 fn test_terminal_reason_max_turns_detected() {
1416 let output = r#"[
1417 {"type":"system","subtype":"init"},
1418 {"type":"result","is_error":false,"terminal_reason":"max_turns","structured_output":{"name":"t","description":"d","entities":[],"relationships":[]}}
1419 ]"#;
1420 let err_or_ok = parse_claude_output(output);
1421 assert!(
1422 err_or_ok.is_ok(),
1423 "max_turns in result without is_error should still parse"
1424 );
1425 }
1426
1427 #[test]
1428 fn test_detect_oauth_from_init_json() {
1429 let output = r#"[
1430 {"type":"system","subtype":"init","apiKeySource":"none"},
1431 {"type":"result","is_error":false,"total_cost_usd":0.50,"structured_output":{"name":"test-oauth","description":"oauth test","entities":[],"relationships":[]}}
1432 ]"#;
1433 let (_result, cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1434 assert!(is_oauth, "apiKeySource=none must be detected as OAuth");
1435 assert!((cost - 0.50).abs() < f64::EPSILON);
1436 }
1437
1438 #[test]
1439 fn test_api_key_source_not_oauth() {
1440 let output = r#"[
1441 {"type":"system","subtype":"init","apiKeySource":"env"},
1442 {"type":"result","is_error":false,"total_cost_usd":0.10,"structured_output":{"name":"test-api","description":"api test","entities":[],"relationships":[]}}
1443 ]"#;
1444 let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1445 assert!(!is_oauth, "apiKeySource=env must NOT be detected as OAuth");
1446 }
1447
1448 #[test]
1449 fn test_missing_api_key_source_defaults_not_oauth() {
1450 let output = r#"[
1451 {"type":"system","subtype":"init"},
1452 {"type":"result","is_error":false,"total_cost_usd":0.05,"structured_output":{"name":"test-missing","description":"missing test","entities":[],"relationships":[]}}
1453 ]"#;
1454 let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1455 assert!(!is_oauth, "missing apiKeySource must default to not OAuth");
1456 }
1457
1458 #[test]
1459 fn test_extraction_schema_entity_types_match_enum() {
1460 let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1461 let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1462 .as_array()
1463 .expect("schema must have entity_type enum");
1464 for t in types {
1465 let s = t.as_str().unwrap();
1466 assert!(
1467 s.parse::<EntityType>().is_ok(),
1468 "schema entity_type '{s}' not in EntityType enum"
1469 );
1470 }
1471 }
1472}