1use crate::commands::ingest::IngestArgs;
12use crate::entity_type::EntityType;
13use crate::errors::AppError;
14use crate::paths::AppPaths;
15use crate::storage::connection::{ensure_db_ready, open_rw};
16use crate::storage::entities::{self, NewEntity, NewRelationship};
17use crate::storage::memories::{self, NewMemory};
18
19use rusqlite::Connection;
20use serde::{Deserialize, Serialize};
21use std::io::Write;
22use std::path::{Path, PathBuf};
23use std::process::{Command, Stdio};
24use std::time::Instant;
25
26const MIN_CLAUDE_VERSION: &str = "2.1.0";
27
28const EXTRACTION_SCHEMA: &str = r#"{
29 "type": "object",
30 "properties": {
31 "name": { "type": "string" },
32 "description": { "type": "string" },
33 "entities": {
34 "type": "array",
35 "items": {
36 "type": "object",
37 "properties": {
38 "name": { "type": "string" },
39 "entity_type": {
40 "type": "string",
41 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
42 }
43 },
44 "required": ["name", "entity_type"],
45 "additionalProperties": false
46 }
47 },
48 "relationships": {
49 "type": "array",
50 "items": {
51 "type": "object",
52 "properties": {
53 "source": { "type": "string" },
54 "target": { "type": "string" },
55 "relation": {
56 "type": "string",
57 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
58 },
59 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
60 },
61 "required": ["source","target","relation","strength"],
62 "additionalProperties": false
63 }
64 }
65 },
66 "required": ["name","description","entities","relationships"],
67 "additionalProperties": false
68}"#;
69
70const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
711. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
722. A one-sentence description (10-20 words) summarizing the key insight\n\
733. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
744. Typed relationships between entities with strength scores\n\n\
75Rules:\n\
76- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
77- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
78- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
79- NEVER use 'mentions' as relationship type\n\
80- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
81- Prefer fewer high-quality entities over many low-quality ones\n\
82- Description must answer: What is this about and WHY does it matter?";
83
84#[derive(Debug, Deserialize)]
85struct ClaudeOutputElement {
86 r#type: Option<String>,
87 subtype: Option<String>,
88 #[serde(default)]
89 is_error: bool,
90 structured_output: Option<ExtractionResult>,
91 result: Option<String>,
92 total_cost_usd: Option<f64>,
93 error: Option<String>,
94 terminal_reason: Option<String>,
95 #[serde(rename = "apiKeySource")]
96 api_key_source: Option<String>,
97}
98
99#[derive(Debug, Clone, Deserialize, Serialize)]
100pub struct ExtractionResult {
101 pub name: String,
102 pub description: String,
103 pub entities: Vec<ExtractedEntity>,
104 pub relationships: Vec<ExtractedRelationship>,
105}
106
107#[derive(Debug, Clone, Deserialize, Serialize)]
108pub struct ExtractedEntity {
109 pub name: String,
110 pub entity_type: String,
111}
112
113#[derive(Debug, Clone, Deserialize, Serialize)]
114pub struct ExtractedRelationship {
115 pub source: String,
116 pub target: String,
117 pub relation: String,
118 pub strength: f64,
119}
120
121#[derive(Debug, Serialize)]
122struct PhaseEvent<'a> {
123 phase: &'a str,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 claude_path: Option<&'a str>,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 version: Option<&'a str>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 dir: Option<&'a str>,
130 #[serde(skip_serializing_if = "Option::is_none")]
131 files_total: Option<usize>,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 files_new: Option<usize>,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 files_existing: Option<usize>,
136}
137
138#[derive(Debug, Serialize)]
139struct FileEvent<'a> {
140 file: &'a str,
141 name: &'a str,
142 status: &'a str,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 memory_id: Option<i64>,
145 #[serde(skip_serializing_if = "Option::is_none")]
146 entities: Option<usize>,
147 #[serde(skip_serializing_if = "Option::is_none")]
148 rels: Option<usize>,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 cost_usd: Option<f64>,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 elapsed_ms: Option<u64>,
153 #[serde(skip_serializing_if = "Option::is_none")]
154 error: Option<&'a str>,
155 index: usize,
156 total: usize,
157}
158
159#[derive(Debug, Serialize)]
160struct Summary {
161 summary: bool,
162 files_total: usize,
163 completed: usize,
164 failed: usize,
165 skipped: usize,
166 entities_total: usize,
167 rels_total: usize,
168 cost_usd: f64,
169 elapsed_ms: u64,
170}
171
172pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
174 if let Some(p) = explicit {
175 if p.exists() {
176 return Ok(p.to_path_buf());
177 }
178 return Err(AppError::Validation(format!(
179 "Claude Code binary not found at explicit path: {}",
180 p.display()
181 )));
182 }
183
184 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
185 let p = PathBuf::from(&env_path);
186 if p.exists() {
187 return Ok(p);
188 }
189 }
190
191 let name = if cfg!(windows) {
192 "claude.exe"
193 } else {
194 "claude"
195 };
196 if let Some(path_var) = std::env::var_os("PATH") {
197 for dir in std::env::split_paths(&path_var) {
198 let candidate = dir.join(name);
199 if candidate.exists() {
200 return Ok(candidate);
201 }
202 }
203 }
204
205 Err(AppError::Validation(
206 "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
207 ))
208}
209
210fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
212 let output = Command::new(binary)
213 .arg("--version")
214 .stdin(Stdio::null())
215 .stdout(Stdio::piped())
216 .stderr(Stdio::piped())
217 .output()
218 .map_err(AppError::Io)?;
219
220 if !output.status.success() {
221 return Err(AppError::Validation(
222 "failed to run 'claude --version'".to_string(),
223 ));
224 }
225
226 let version_str = String::from_utf8(output.stdout)
227 .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
228 let version = version_str.trim().to_string();
229
230 let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
232
233 fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
234 let parts: Vec<&str> = s.splitn(3, '.').collect();
235 if parts.len() < 2 {
236 return None;
237 }
238 let major = parts[0].parse::<u64>().ok()?;
239 let minor = parts[1].parse::<u64>().ok()?;
240 let patch = parts
241 .get(2)
242 .and_then(|p| p.parse::<u64>().ok())
243 .unwrap_or(0);
244 Some((major, minor, patch))
245 }
246
247 if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
248 if actual < min {
249 return Err(AppError::Validation(format!(
250 "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
251 )));
252 }
253 }
254
255 Ok(version)
256}
257
258fn extract_with_claude(
265 binary: &Path,
266 file_content: &[u8],
267 model: Option<&str>,
268 timeout_secs: u64,
269) -> Result<(ExtractionResult, f64, bool), AppError> {
270 use wait_timeout::ChildExt;
271
272 let mut cmd = Command::new(binary);
273
274 cmd.env_clear();
275 for var in &[
276 "PATH",
277 "HOME",
278 "USER",
279 "SHELL",
280 "TERM",
281 "LANG",
282 "XDG_CONFIG_HOME",
283 "XDG_DATA_HOME",
284 "XDG_RUNTIME_DIR",
285 "ANTHROPIC_API_KEY",
286 "CLAUDE_CONFIG_DIR",
287 "TMPDIR",
288 "TMP",
289 "TEMP",
290 "DYLD_FALLBACK_LIBRARY_PATH",
291 ] {
292 if let Ok(val) = std::env::var(var) {
293 cmd.env(var, val);
294 }
295 }
296
297 #[cfg(windows)]
298 for var in &[
299 "LOCALAPPDATA",
300 "APPDATA",
301 "USERPROFILE",
302 "SystemRoot",
303 "COMSPEC",
304 "PATHEXT",
305 "HOMEPATH",
306 "HOMEDRIVE",
307 ] {
308 if let Ok(val) = std::env::var(var) {
309 cmd.env(var, val);
310 }
311 }
312
313 cmd.arg("-p")
314 .arg(EXTRACTION_PROMPT)
315 .arg("--output-format")
316 .arg("json")
317 .arg("--json-schema")
318 .arg(EXTRACTION_SCHEMA)
319 .arg("--max-turns")
320 .arg("3")
321 .arg("--no-session-persistence");
322
323 if std::env::var("ANTHROPIC_API_KEY").is_ok() {
324 cmd.arg("--bare");
325 } else {
326 cmd.arg("--dangerously-skip-permissions")
327 .arg("--settings")
328 .arg(r#"{"hooks":{}}"#);
329 }
330
331 if let Some(m) = model {
332 cmd.arg("--model").arg(m);
333 }
334
335 cmd.stdin(Stdio::piped())
336 .stdout(Stdio::piped())
337 .stderr(Stdio::piped());
338
339 let mut child = cmd.spawn().map_err(|e| {
340 AppError::Io(std::io::Error::new(
341 e.kind(),
342 format!("failed to spawn claude: {e}"),
343 ))
344 })?;
345
346 let stdin_data = file_content.to_vec();
347 let mut child_stdin = child
348 .stdin
349 .take()
350 .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
351 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
352 child_stdin.write_all(&stdin_data)?;
353 Ok(())
354 });
355
356 let timeout = std::time::Duration::from_secs(timeout_secs);
357 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
358
359 match status {
360 Some(exit_status) => {
361 stdin_thread
362 .join()
363 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
364 .map_err(AppError::Io)?;
365
366 let mut stdout_buf = Vec::new();
367 let mut stderr_buf = Vec::new();
368 if let Some(mut out) = child.stdout.take() {
369 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
370 }
371 if let Some(mut err) = child.stderr.take() {
372 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
373 }
374
375 if !exit_status.success() {
376 let stdout_str = String::from_utf8_lossy(&stdout_buf);
377 if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
378 {
379 if let Some(re) = elements
380 .iter()
381 .find(|e| e.r#type.as_deref() == Some("result"))
382 {
383 if re.terminal_reason.as_deref() == Some("max_turns") {
384 tracing::warn!(
385 target: "ingest",
386 "extraction hit max_turns limit — hooks may have consumed turns"
387 );
388 return Err(AppError::Validation(
389 "claude -p hit max_turns: hooks may be consuming turns".into(),
390 ));
391 }
392 if re.is_error {
393 let err_msg = re
394 .error
395 .as_deref()
396 .or(re.result.as_deref())
397 .unwrap_or("unknown error");
398 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
399 return Err(AppError::Validation(format!(
400 "RATE_LIMITED: {err_msg}"
401 )));
402 }
403 if err_msg.contains("Not logged in")
404 || err_msg.contains("authentication")
405 {
406 tracing::warn!(
407 target: "ingest",
408 "Claude Code authentication failed. Re-authenticate interactively with: claude"
409 );
410 }
411 return Err(AppError::Validation(format!(
412 "claude -p failed: {err_msg}"
413 )));
414 }
415 }
416 }
417 let stderr_str = String::from_utf8_lossy(&stderr_buf);
418 if stderr_str.contains("auth") || stderr_str.contains("login") {
419 tracing::warn!(
420 target: "ingest",
421 "Claude Code authentication may have failed. Re-authenticate with: claude"
422 );
423 }
424 return Err(AppError::Validation(format!(
425 "claude -p exited with code {:?}: {}",
426 exit_status.code(),
427 stderr_str.trim()
428 )));
429 }
430
431 let stdout = String::from_utf8(stdout_buf)
432 .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
433 parse_claude_output(&stdout)
434 }
435 None => {
436 tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
437 let _ = child.kill();
438 let _ = child.wait();
439 let _ = stdin_thread.join();
440 Err(AppError::Validation(format!(
441 "claude -p timed out after {timeout_secs} seconds"
442 )))
443 }
444 }
445}
446
447fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64, bool), AppError> {
452 let elements: Vec<ClaudeOutputElement> = serde_json::from_str(stdout).map_err(|e| {
453 AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
454 })?;
455
456 let is_oauth = elements
457 .iter()
458 .find(|e| e.r#type.as_deref() == Some("system") && e.subtype.as_deref() == Some("init"))
459 .and_then(|e| e.api_key_source.as_deref())
460 .map(|s| s == "none")
461 .unwrap_or(false);
462
463 let result_elem = elements
464 .iter()
465 .find(|e| e.r#type.as_deref() == Some("result"))
466 .ok_or_else(|| {
467 AppError::Validation("claude output missing 'result' element".to_string())
468 })?;
469
470 if result_elem.is_error {
471 let err_msg = result_elem
472 .error
473 .as_deref()
474 .or(result_elem.result.as_deref())
475 .unwrap_or("unknown error");
476 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
477 return Err(AppError::Validation(format!("RATE_LIMITED: {err_msg}")));
478 }
479 return Err(AppError::Validation(format!(
480 "claude extraction failed: {err_msg}"
481 )));
482 }
483
484 let extraction = result_elem
485 .structured_output
486 .clone()
487 .or_else(|| {
488 result_elem
489 .result
490 .as_ref()
491 .and_then(|text| serde_json::from_str::<ExtractionResult>(text).ok())
492 })
493 .ok_or_else(|| {
494 AppError::Validation("claude result missing structured_output and result field".into())
495 })?;
496
497 let cost = result_elem.total_cost_usd.unwrap_or(0.0);
498
499 Ok((extraction, cost, is_oauth))
500}
501
502fn emit_json<T: Serialize>(value: &T) {
503 if let Ok(json) = serde_json::to_string(value) {
504 let stdout = std::io::stdout();
505 let mut lock = stdout.lock();
506 let _ = writeln!(lock, "{json}");
507 let _ = lock.flush();
508 }
509}
510
511fn collect_matching_files(
513 dir: &Path,
514 pattern: &str,
515 recursive: bool,
516 max_files: usize,
517) -> Result<Vec<PathBuf>, AppError> {
518 let mut files = Vec::new();
519 super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
520 files.sort();
521
522 if files.len() > max_files {
523 return Err(AppError::Validation(format!(
524 "found {} files, exceeds --max-files cap of {}",
525 files.len(),
526 max_files
527 )));
528 }
529
530 Ok(files)
531}
532
533fn open_queue_db(path: &str) -> Result<Connection, AppError> {
535 let conn = Connection::open(path)?;
536
537 conn.pragma_update(None, "journal_mode", "wal")?;
538
539 conn.execute_batch(
540 "CREATE TABLE IF NOT EXISTS queue (
541 id INTEGER PRIMARY KEY AUTOINCREMENT,
542 file_path TEXT NOT NULL UNIQUE,
543 name TEXT,
544 status TEXT NOT NULL DEFAULT 'pending',
545 memory_id INTEGER,
546 entities INTEGER DEFAULT 0,
547 rels INTEGER DEFAULT 0,
548 error TEXT,
549 cost_usd REAL DEFAULT 0.0,
550 attempt INTEGER DEFAULT 0,
551 elapsed_ms INTEGER,
552 created_at TEXT DEFAULT (datetime('now')),
553 done_at TEXT
554 );
555 CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
556 )?;
557
558 Ok(conn)
559}
560
561pub fn run_claude_ingest(args: &IngestArgs) -> Result<(), AppError> {
563 let started = Instant::now();
564
565 if !args.dir.exists() {
566 return Err(AppError::Validation(format!(
567 "directory not found: {}",
568 args.dir.display()
569 )));
570 }
571
572 let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
574 let version = validate_claude_version(&claude_binary)?;
575 tracing::info!(
576 target: "ingest",
577 binary = %claude_binary.display(),
578 version = %version,
579 "Claude Code binary validated"
580 );
581
582 emit_json(&PhaseEvent {
583 phase: "validate",
584 claude_path: claude_binary.to_str(),
585 version: Some(&version),
586 dir: None,
587 files_total: None,
588 files_new: None,
589 files_existing: None,
590 });
591
592 let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
594
595 let queue_conn = open_queue_db(&args.queue_db)?;
596
597 if args.resume {
598 let reset = queue_conn
599 .execute(
600 "UPDATE queue SET status='pending' WHERE status='processing'",
601 [],
602 )
603 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
604 if reset > 0 {
605 tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
606 }
607 }
608
609 if args.retry_failed {
610 let count = queue_conn
611 .execute(
612 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
613 [],
614 )
615 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
616 tracing::info!(target: "ingest", count, "retrying failed files");
617 }
618
619 if !args.resume && !args.retry_failed {
620 queue_conn
621 .execute("DELETE FROM queue", [])
622 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
623 }
624
625 let mut new_count = 0usize;
626 let mut existing_count = 0usize;
627
628 if !args.retry_failed {
629 for file in &files {
630 let file_str = file.to_string_lossy().to_string();
631 let inserted = queue_conn
632 .execute(
633 "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
634 rusqlite::params![file_str],
635 )
636 .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
637 if inserted > 0 {
638 new_count += 1;
639 } else {
640 existing_count += 1;
641 }
642 }
643 }
644
645 emit_json(&PhaseEvent {
646 phase: "scan",
647 claude_path: None,
648 version: None,
649 dir: args.dir.to_str(),
650 files_total: Some(files.len()),
651 files_new: Some(new_count),
652 files_existing: Some(existing_count),
653 });
654
655 if args.dry_run {
656 for (idx, file) in files.iter().enumerate() {
657 let (name, _truncated, _orig) =
658 super::ingest::derive_kebab_name(file, args.max_name_length);
659 emit_json(&FileEvent {
660 file: &file.to_string_lossy(),
661 name: &name,
662 status: "preview",
663 memory_id: None,
664 entities: None,
665 rels: None,
666 cost_usd: None,
667 elapsed_ms: None,
668 error: None,
669 index: idx,
670 total: files.len(),
671 });
672 }
673 emit_json(&Summary {
674 summary: true,
675 files_total: files.len(),
676 completed: 0,
677 failed: 0,
678 skipped: 0,
679 entities_total: 0,
680 rels_total: 0,
681 cost_usd: 0.0,
682 elapsed_ms: started.elapsed().as_millis() as u64,
683 });
684 if !args.keep_queue {
685 let _ = std::fs::remove_file(&args.queue_db);
686 }
687 return Ok(());
688 }
689
690 let paths = AppPaths::resolve(args.db.as_deref())?;
692 ensure_db_ready(&paths)?;
693 let conn = open_rw(&paths.db)?;
694 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
695 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
696 let memory_type_str = args.r#type.as_str().to_string();
697
698 let mut completed = 0usize;
699 let mut failed = 0usize;
700 let skipped_initial: usize = queue_conn
701 .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
702 r.get::<_, usize>(0)
703 })
704 .unwrap_or(0);
705 let mut skipped = skipped_initial;
706 let mut entities_total = 0usize;
707 let mut rels_total = 0usize;
708 let mut cost_total = 0.0f64;
709 let mut oauth_detected = false;
710 let total = files.len();
711
712 let mut backoff_secs = args.rate_limit_wait;
713
714 loop {
715 let pending: Option<(i64, String)> = queue_conn
716 .query_row(
717 "UPDATE queue SET status='processing', attempt=attempt+1 \
718 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
719 RETURNING id, file_path",
720 [],
721 |row| Ok((row.get(0)?, row.get(1)?)),
722 )
723 .ok();
724
725 let (queue_id, file_path) = match pending {
726 Some(p) => p,
727 None => break,
728 };
729
730 let file_started = Instant::now();
731
732 const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
734 if let Ok(meta) = std::fs::metadata(&file_path) {
735 if meta.len() > MAX_FILE_SIZE {
736 let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
737 let _ = queue_conn.execute(
738 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
739 rusqlite::params![err_msg, queue_id],
740 );
741 let current_index = completed + failed + skipped;
742 failed += 1;
743 emit_json(&FileEvent {
744 file: &file_path,
745 name: "",
746 status: "failed",
747 memory_id: None,
748 entities: None,
749 rels: None,
750 cost_usd: None,
751 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
752 error: Some(&err_msg),
753 index: current_index,
754 total,
755 });
756 if args.fail_fast {
757 break;
758 }
759 continue;
760 }
761 }
762
763 let file_content = match std::fs::read(&file_path) {
764 Ok(c) => c,
765 Err(e) => {
766 let err_msg = format!("IO error: {e}");
767 let _ = queue_conn.execute(
768 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
769 rusqlite::params![err_msg, queue_id],
770 );
771 let current_index = completed + failed + skipped;
772 failed += 1;
773 emit_json(&FileEvent {
774 file: &file_path,
775 name: "",
776 status: "failed",
777 memory_id: None,
778 entities: None,
779 rels: None,
780 cost_usd: None,
781 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
782 error: Some(&err_msg),
783 index: current_index,
784 total,
785 });
786 if args.fail_fast {
787 break;
788 }
789 continue;
790 }
791 };
792
793 if file_content.len() > crate::constants::MAX_MEMORY_BODY_LEN {
795 let err_msg = format!(
796 "file body exceeds {} byte limit ({} bytes) — skipping to avoid wasting LLM tokens",
797 crate::constants::MAX_MEMORY_BODY_LEN,
798 file_content.len()
799 );
800 tracing::warn!(target: "ingest", file = %file_path, size = file_content.len(), "body exceeds limit, skipping LLM extraction");
801 let _ = queue_conn.execute(
802 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
803 rusqlite::params![err_msg, queue_id],
804 );
805 let current_index = completed + failed + skipped;
806 skipped += 1;
807 emit_json(&FileEvent {
808 file: &file_path,
809 name: "",
810 status: "skipped",
811 memory_id: None,
812 entities: None,
813 rels: None,
814 cost_usd: None,
815 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
816 error: Some(&err_msg),
817 index: current_index,
818 total,
819 });
820 continue;
821 }
822
823 let max_extract_attempts: u32 = 2;
825 let mut extraction_result: Option<(ExtractionResult, f64, bool)> = None;
826 let mut last_extract_err: Option<String> = None;
827
828 for attempt in 1..=max_extract_attempts {
829 match extract_with_claude(
830 &claude_binary,
831 &file_content,
832 args.claude_model.as_deref(),
833 args.claude_timeout,
834 ) {
835 Ok(result) => {
836 extraction_result = Some(result);
837 break;
838 }
839 Err(ref e) if format!("{e}").contains("RATE_LIMITED") => {
840 last_extract_err = Some(format!("{e}"));
841 break;
842 }
843 Err(e) => {
844 let msg = format!("{e}");
845 if attempt < max_extract_attempts {
846 tracing::warn!(target: "ingest", attempt, error = %msg, "extraction failed, retrying (cold-start workaround)");
847 std::thread::sleep(std::time::Duration::from_secs(2));
848 }
849 last_extract_err = Some(msg);
850 }
851 }
852 }
853
854 if let Some((extraction, cost, is_oauth)) = extraction_result {
855 if is_oauth && !oauth_detected {
856 oauth_detected = true;
857 tracing::info!(target: "ingest", "OAuth subscription detected — cost_usd omitted from output");
858 }
859 backoff_secs = args.rate_limit_wait;
860
861 let (normalized_name, _truncated, _orig) = crate::commands::ingest::derive_kebab_name(
862 std::path::Path::new(&extraction.name),
863 args.max_name_length,
864 );
865 let name = &normalized_name;
866 let ent_count = extraction.entities.len();
867 let rel_count = extraction.relationships.len();
868
869 let new_entities: Vec<NewEntity> = extraction
870 .entities
871 .iter()
872 .filter_map(|e| match e.entity_type.parse::<EntityType>() {
873 Ok(et) => Some(NewEntity {
874 name: e.name.clone(),
875 entity_type: et,
876 description: None,
877 }),
878 Err(_) => {
879 tracing::warn!(
880 target: "ingest",
881 entity = %e.name,
882 entity_type = %e.entity_type,
883 "entity type not recognized, skipping"
884 );
885 None
886 }
887 })
888 .collect();
889
890 let new_relationships: Vec<NewRelationship> = extraction
891 .relationships
892 .iter()
893 .map(|r| NewRelationship {
894 source: r.source.clone(),
895 target: r.target.clone(),
896 relation: crate::parsers::normalize_relation(&r.relation),
897 strength: r.strength,
898 description: None,
899 })
900 .collect();
901
902 let body_str = String::from_utf8_lossy(&file_content);
903 let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
904 let new_memory = NewMemory {
905 name: name.clone(),
906 namespace: namespace.clone(),
907 memory_type: memory_type_str.clone(),
908 description: extraction.description.clone(),
909 body: body_str.to_string(),
910 body_hash,
911 session_id: None,
912 source: "agent".to_string(),
913 metadata: serde_json::Value::Object(serde_json::Map::new()),
914 };
915
916 let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
918 Some((existing_id, is_deleted)) => {
919 if is_deleted {
920 memories::clear_deleted_at(&conn, existing_id)?;
921 }
922 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
923 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
924 rusqlite::params![existing_id],
925 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
926 )?;
927 memories::update(&conn, existing_id, &new_memory, None)?;
928 memories::sync_fts_after_update(
929 &conn,
930 existing_id,
931 &old_name,
932 &old_desc,
933 &old_body,
934 &new_memory.name,
935 &new_memory.description,
936 &new_memory.body,
937 )?;
938 tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
939 existing_id
940 }
941 None => match memories::insert(&conn, &new_memory) {
942 Ok(id) => id,
943 Err(e) => {
944 let err_msg = format!("{e}");
945 let _ = queue_conn.execute(
946 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
947 rusqlite::params![err_msg, queue_id],
948 );
949 let current_index = completed + failed + skipped;
950 failed += 1;
951 emit_json(&FileEvent {
952 file: &file_path,
953 name,
954 status: "failed",
955 memory_id: None,
956 entities: None,
957 rels: None,
958 cost_usd: if is_oauth { None } else { Some(cost) },
959 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
960 error: Some(&err_msg),
961 index: current_index,
962 total,
963 });
964 if !is_oauth {
965 cost_total += cost;
966 }
967 if args.fail_fast {
968 break;
969 }
970 continue;
971 }
972 },
973 };
974
975 for ent in &new_entities {
976 match entities::upsert_entity(&conn, &namespace, ent) {
977 Ok(eid) => {
978 let _ = entities::link_memory_entity(&conn, memory_id, eid);
979 }
980 Err(e) => {
981 tracing::warn!(
982 target: "ingest",
983 entity = %ent.name,
984 error = %e,
985 "entity skipped due to validation error"
986 );
987 }
988 }
989 }
990 for rel in &new_relationships {
991 crate::parsers::warn_if_non_canonical(&rel.relation);
992 let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
993 let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
994 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
995 let _ = conn.execute(
996 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
997 rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
998 );
999 }
1000 }
1001
1002 let body_text = String::from_utf8_lossy(&file_content).into_owned();
1004 let snippet: String = body_text.chars().take(200).collect();
1005 let chunks_info =
1006 crate::chunking::split_into_chunks_hierarchical(&body_text, tokenizer);
1007
1008 let embedding_result = if chunks_info.len() <= 1 {
1009 crate::daemon::embed_passage_or_local(&paths.models, &body_text)
1010 } else {
1011 let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
1012 let mut multi_ok = true;
1013 for chunk in &chunks_info {
1014 let chunk_text = crate::chunking::chunk_text(&body_text, chunk);
1015 match crate::daemon::embed_passage_or_local(&paths.models, chunk_text) {
1016 Ok(emb) => chunk_embeddings.push(emb),
1017 Err(e) => {
1018 tracing::warn!(
1019 target: "ingest",
1020 file = %file_path,
1021 error = %e,
1022 "chunk embedding failed, skipping vector index for this file"
1023 );
1024 multi_ok = false;
1025 break;
1026 }
1027 }
1028 }
1029 if multi_ok {
1030 let aggregated = crate::chunking::aggregate_embeddings(&chunk_embeddings);
1031 if let Err(e) = crate::storage::chunks::insert_chunk_slices(
1033 &conn,
1034 memory_id,
1035 &body_text,
1036 &chunks_info,
1037 ) {
1038 tracing::warn!(
1039 target: "ingest",
1040 file = %file_path,
1041 error = %e,
1042 "chunk slice insert failed"
1043 );
1044 } else {
1045 for (i, emb) in chunk_embeddings.iter().enumerate() {
1046 if let Err(e) = crate::storage::chunks::upsert_chunk_vec(
1047 &conn, i as i64, memory_id, i as i32, emb,
1048 ) {
1049 tracing::warn!(
1050 target: "ingest",
1051 file = %file_path,
1052 chunk = i,
1053 error = %e,
1054 "chunk vec upsert failed"
1055 );
1056 }
1057 }
1058 }
1059 Ok(aggregated)
1060 } else {
1061 crate::daemon::embed_passage_or_local(&paths.models, &body_text)
1063 }
1064 };
1065
1066 match embedding_result {
1067 Ok(embedding) => {
1068 if let Err(e) = memories::upsert_vec(
1069 &conn,
1070 memory_id,
1071 &namespace,
1072 &memory_type_str,
1073 &embedding,
1074 name,
1075 &snippet,
1076 ) {
1077 tracing::warn!(
1078 target: "ingest",
1079 file = %file_path,
1080 error = %e,
1081 "memory vec upsert failed; recall may not find this memory"
1082 );
1083 }
1084 for ent in &new_entities {
1086 if let Ok(Some(eid)) =
1087 entities::find_entity_id(&conn, &namespace, &ent.name)
1088 {
1089 let entity_text = ent.name.clone();
1090 match crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
1091 {
1092 Ok(emb) => {
1093 if let Err(e) = entities::upsert_entity_vec(
1094 &conn,
1095 eid,
1096 &namespace,
1097 ent.entity_type,
1098 &emb,
1099 &ent.name,
1100 ) {
1101 tracing::warn!(
1102 target: "ingest",
1103 entity = %ent.name,
1104 error = %e,
1105 "entity vec upsert failed"
1106 );
1107 }
1108 }
1109 Err(e) => {
1110 tracing::warn!(
1111 target: "ingest",
1112 entity = %ent.name,
1113 error = %e,
1114 "entity embedding failed"
1115 );
1116 }
1117 }
1118 }
1119 }
1120 }
1121 Err(e) => {
1122 tracing::warn!(
1123 target: "ingest",
1124 file = %file_path,
1125 error = %e,
1126 "memory embedding failed; recall will not find this memory"
1127 );
1128 }
1129 }
1130
1131 let _ = queue_conn.execute(
1132 "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1133 rusqlite::params![
1134 name,
1135 memory_id,
1136 ent_count,
1137 rel_count,
1138 cost,
1139 file_started.elapsed().as_millis() as i64,
1140 queue_id
1141 ],
1142 );
1143
1144 let current_index = completed + failed + skipped;
1145 completed += 1;
1146 entities_total += ent_count;
1147 rels_total += rel_count;
1148 if !is_oauth {
1149 cost_total += cost;
1150 }
1151
1152 emit_json(&FileEvent {
1153 file: &file_path,
1154 name,
1155 status: "done",
1156 memory_id: Some(memory_id),
1157 entities: Some(ent_count),
1158 rels: Some(rel_count),
1159 cost_usd: if is_oauth { None } else { Some(cost) },
1160 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1161 error: None,
1162 index: current_index,
1163 total,
1164 });
1165 } else if let Some(ref err_str) = last_extract_err {
1166 if err_str.contains("RATE_LIMITED") {
1167 tracing::warn!(
1168 target: "ingest",
1169 wait_seconds = backoff_secs,
1170 "rate limited, waiting before retry"
1171 );
1172 let _ = queue_conn.execute(
1173 "UPDATE queue SET status='pending' WHERE id=?1",
1174 rusqlite::params![queue_id],
1175 );
1176 std::thread::sleep(std::time::Duration::from_secs(backoff_secs));
1177 backoff_secs = (backoff_secs * 2).min(900);
1178 continue;
1179 } else {
1180 let _ = queue_conn.execute(
1181 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1182 rusqlite::params![err_str, queue_id],
1183 );
1184 let current_index = completed + failed + skipped;
1185 failed += 1;
1186 emit_json(&FileEvent {
1187 file: &file_path,
1188 name: "",
1189 status: "failed",
1190 memory_id: None,
1191 entities: None,
1192 rels: None,
1193 cost_usd: None,
1194 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1195 error: Some(err_str),
1196 index: current_index,
1197 total,
1198 });
1199 if args.fail_fast {
1200 break;
1201 }
1202 }
1203 }
1204
1205 if let Some(budget) = args.max_cost_usd {
1206 if oauth_detected {
1207 tracing::debug!(target: "ingest", "--max-cost-usd ignored: OAuth subscription detected");
1208 } else if cost_total >= budget {
1209 tracing::warn!(
1210 target: "ingest",
1211 spent = cost_total,
1212 budget = budget,
1213 "budget exceeded, stopping"
1214 );
1215 break;
1216 }
1217 }
1218 }
1219
1220 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1222 let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1223
1224 emit_json(&Summary {
1225 summary: true,
1226 files_total: total,
1227 completed,
1228 failed,
1229 skipped,
1230 entities_total,
1231 rels_total,
1232 cost_usd: cost_total,
1233 elapsed_ms: started.elapsed().as_millis() as u64,
1234 });
1235
1236 if !args.keep_queue && failed == 0 {
1237 let _ = std::fs::remove_file(&args.queue_db);
1238 }
1239
1240 Ok(())
1241}
1242
1243#[cfg(test)]
1244mod tests {
1245 use super::*;
1246
1247 #[test]
1248 fn test_extraction_schema_valid_json() {
1249 let _: serde_json::Value =
1250 serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
1251 }
1252
1253 #[test]
1254 fn test_parse_claude_output_valid() {
1255 let output = r#"[
1256 {"type":"system","subtype":"init"},
1257 {"type":"assistant"},
1258 {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
1259 ]"#;
1260 let (result, cost, _is_oauth) = parse_claude_output(output).expect("parse must succeed");
1261 assert_eq!(result.name, "test-doc");
1262 assert_eq!(result.entities.len(), 1);
1263 assert_eq!(result.relationships.len(), 1);
1264 assert!((cost - 0.02).abs() < f64::EPSILON);
1265 }
1266
1267 #[test]
1268 fn test_parse_claude_output_error() {
1269 let output = r#"[
1270 {"type":"system","subtype":"init"},
1271 {"type":"result","is_error":true,"error":"authentication failed"}
1272 ]"#;
1273 let err = parse_claude_output(output).unwrap_err();
1274 assert!(format!("{err}").contains("authentication failed"));
1275 }
1276
1277 #[test]
1278 fn test_parse_claude_output_rate_limit() {
1279 let output = r#"[
1280 {"type":"system","subtype":"init"},
1281 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
1282 ]"#;
1283 let err = parse_claude_output(output).unwrap_err();
1284 assert!(format!("{err}").contains("RATE_LIMITED"));
1285 }
1286
1287 #[test]
1288 fn test_parse_claude_output_malformed() {
1289 let output = "not json at all";
1290 assert!(parse_claude_output(output).is_err());
1291 }
1292
1293 #[test]
1294 fn test_find_claude_binary_not_found() {
1295 let original_path = std::env::var_os("PATH");
1296 std::env::set_var("PATH", "/nonexistent");
1297 std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
1298 let result = find_claude_binary(None);
1299 if let Some(p) = original_path {
1300 std::env::set_var("PATH", p);
1301 }
1302 assert!(result.is_err());
1303 }
1304
1305 #[test]
1306 fn test_parse_claude_output_result_fallback() {
1307 let output = r#"[
1308 {"type":"system","subtype":"init"},
1309 {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1310 ]"#;
1311 let (result, cost, _is_oauth) =
1312 parse_claude_output(output).expect("result fallback must work");
1313 assert_eq!(result.name, "test-fallback");
1314 assert_eq!(result.entities.len(), 1);
1315 assert!(result.relationships.is_empty());
1316 assert!((cost - 0.01).abs() < f64::EPSILON);
1317 }
1318
1319 #[test]
1320 fn test_parse_claude_output_error_with_result_field() {
1321 let output = r#"[
1322 {"type":"system","subtype":"init"},
1323 {"type":"result","is_error":true,"result":"Not logged in · Please run /login"}
1324 ]"#;
1325 let err = parse_claude_output(output).unwrap_err();
1326 let msg = format!("{err}");
1327 assert!(
1328 msg.contains("Not logged in"),
1329 "expected 'Not logged in' in: {msg}"
1330 );
1331 }
1332
1333 #[test]
1334 fn test_terminal_reason_max_turns_detected() {
1335 let output = r#"[
1336 {"type":"system","subtype":"init"},
1337 {"type":"result","is_error":false,"terminal_reason":"max_turns","structured_output":{"name":"t","description":"d","entities":[],"relationships":[]}}
1338 ]"#;
1339 let err_or_ok = parse_claude_output(output);
1340 assert!(
1341 err_or_ok.is_ok(),
1342 "max_turns in result without is_error should still parse"
1343 );
1344 }
1345
1346 #[test]
1347 fn test_detect_oauth_from_init_json() {
1348 let output = r#"[
1349 {"type":"system","subtype":"init","apiKeySource":"none"},
1350 {"type":"result","is_error":false,"total_cost_usd":0.50,"structured_output":{"name":"test-oauth","description":"oauth test","entities":[],"relationships":[]}}
1351 ]"#;
1352 let (_result, cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1353 assert!(is_oauth, "apiKeySource=none must be detected as OAuth");
1354 assert!((cost - 0.50).abs() < f64::EPSILON);
1355 }
1356
1357 #[test]
1358 fn test_api_key_source_not_oauth() {
1359 let output = r#"[
1360 {"type":"system","subtype":"init","apiKeySource":"env"},
1361 {"type":"result","is_error":false,"total_cost_usd":0.10,"structured_output":{"name":"test-api","description":"api test","entities":[],"relationships":[]}}
1362 ]"#;
1363 let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1364 assert!(!is_oauth, "apiKeySource=env must NOT be detected as OAuth");
1365 }
1366
1367 #[test]
1368 fn test_missing_api_key_source_defaults_not_oauth() {
1369 let output = r#"[
1370 {"type":"system","subtype":"init"},
1371 {"type":"result","is_error":false,"total_cost_usd":0.05,"structured_output":{"name":"test-missing","description":"missing test","entities":[],"relationships":[]}}
1372 ]"#;
1373 let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1374 assert!(!is_oauth, "missing apiKeySource must default to not OAuth");
1375 }
1376
1377 #[test]
1378 fn test_extraction_schema_entity_types_match_enum() {
1379 let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1380 let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1381 .as_array()
1382 .expect("schema must have entity_type enum");
1383 for t in types {
1384 let s = t.as_str().unwrap();
1385 assert!(
1386 s.parse::<EntityType>().is_ok(),
1387 "schema entity_type '{s}' not in EntityType enum"
1388 );
1389 }
1390 }
1391}