1use crate::commands::ingest::IngestArgs;
12use crate::entity_type::EntityType;
13use crate::errors::AppError;
14use crate::paths::AppPaths;
15use crate::storage::connection::{ensure_db_ready, open_rw};
16use crate::storage::entities::{self, NewEntity, NewRelationship};
17use crate::storage::memories::{self, NewMemory};
18
19use rusqlite::Connection;
20use serde::{Deserialize, Serialize};
21use std::io::Write;
22use std::path::{Path, PathBuf};
23use std::process::{Command, Stdio};
24use std::time::Instant;
25
26#[allow(dead_code)]
27const MIN_CLAUDE_VERSION: &str = "2.1.0";
28
29const EXTRACTION_SCHEMA: &str = r#"{
30 "type": "object",
31 "properties": {
32 "name": { "type": "string" },
33 "description": { "type": "string" },
34 "entities": {
35 "type": "array",
36 "items": {
37 "type": "object",
38 "properties": {
39 "name": { "type": "string" },
40 "entity_type": {
41 "type": "string",
42 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
43 }
44 },
45 "required": ["name", "entity_type"]
46 }
47 },
48 "relationships": {
49 "type": "array",
50 "items": {
51 "type": "object",
52 "properties": {
53 "source": { "type": "string" },
54 "target": { "type": "string" },
55 "relation": {
56 "type": "string",
57 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
58 },
59 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
60 },
61 "required": ["source","target","relation","strength"]
62 }
63 }
64 },
65 "required": ["name","description","entities","relationships"]
66}"#;
67
68const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
691. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
702. A one-sentence description (10-20 words) summarizing the key insight\n\
713. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
724. Typed relationships between entities with strength scores\n\n\
73Rules:\n\
74- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
75- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
76- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
77- NEVER use 'mentions' as relationship type\n\
78- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
79- Prefer fewer high-quality entities over many low-quality ones\n\
80- Description must answer: What is this about and WHY does it matter?";
81
82#[derive(Debug, Deserialize)]
83struct ClaudeOutputElement {
84 r#type: Option<String>,
85 #[allow(dead_code)]
86 subtype: Option<String>,
87 #[serde(default)]
88 is_error: bool,
89 structured_output: Option<ExtractionResult>,
90 result: Option<String>,
91 total_cost_usd: Option<f64>,
92 error: Option<String>,
93}
94
95#[derive(Debug, Clone, Deserialize, Serialize)]
96pub struct ExtractionResult {
97 pub name: String,
98 pub description: String,
99 pub entities: Vec<ExtractedEntity>,
100 pub relationships: Vec<ExtractedRelationship>,
101}
102
103#[derive(Debug, Clone, Deserialize, Serialize)]
104pub struct ExtractedEntity {
105 pub name: String,
106 pub entity_type: String,
107}
108
109#[derive(Debug, Clone, Deserialize, Serialize)]
110pub struct ExtractedRelationship {
111 pub source: String,
112 pub target: String,
113 pub relation: String,
114 pub strength: f64,
115}
116
117#[derive(Debug, Serialize)]
118struct PhaseEvent<'a> {
119 phase: &'a str,
120 #[serde(skip_serializing_if = "Option::is_none")]
121 claude_path: Option<&'a str>,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 version: Option<&'a str>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 dir: Option<&'a str>,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 files_total: Option<usize>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 files_new: Option<usize>,
130 #[serde(skip_serializing_if = "Option::is_none")]
131 files_existing: Option<usize>,
132}
133
134#[derive(Debug, Serialize)]
135struct FileEvent<'a> {
136 file: &'a str,
137 name: &'a str,
138 status: &'a str,
139 #[serde(skip_serializing_if = "Option::is_none")]
140 memory_id: Option<i64>,
141 #[serde(skip_serializing_if = "Option::is_none")]
142 entities: Option<usize>,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 rels: Option<usize>,
145 #[serde(skip_serializing_if = "Option::is_none")]
146 cost_usd: Option<f64>,
147 #[serde(skip_serializing_if = "Option::is_none")]
148 elapsed_ms: Option<u64>,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 error: Option<&'a str>,
151 index: usize,
152 total: usize,
153}
154
155#[derive(Debug, Serialize)]
156struct Summary {
157 summary: bool,
158 files_total: usize,
159 completed: usize,
160 failed: usize,
161 skipped: usize,
162 entities_total: usize,
163 rels_total: usize,
164 cost_usd: f64,
165 elapsed_ms: u64,
166}
167
168pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
170 if let Some(p) = explicit {
171 if p.exists() {
172 return Ok(p.to_path_buf());
173 }
174 return Err(AppError::Validation(format!(
175 "Claude Code binary not found at explicit path: {}",
176 p.display()
177 )));
178 }
179
180 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
181 let p = PathBuf::from(&env_path);
182 if p.exists() {
183 return Ok(p);
184 }
185 }
186
187 let name = if cfg!(windows) {
188 "claude.exe"
189 } else {
190 "claude"
191 };
192 if let Some(path_var) = std::env::var_os("PATH") {
193 for dir in std::env::split_paths(&path_var) {
194 let candidate = dir.join(name);
195 if candidate.exists() {
196 return Ok(candidate);
197 }
198 }
199 }
200
201 Err(AppError::Validation(
202 "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
203 ))
204}
205
206fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
208 let output = Command::new(binary)
209 .arg("--version")
210 .stdin(Stdio::null())
211 .stdout(Stdio::piped())
212 .stderr(Stdio::piped())
213 .output()
214 .map_err(AppError::Io)?;
215
216 if !output.status.success() {
217 return Err(AppError::Validation(
218 "failed to run 'claude --version'".to_string(),
219 ));
220 }
221
222 let version_str = String::from_utf8(output.stdout)
223 .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
224 let version = version_str.trim().to_string();
225
226 Ok(version)
227}
228
229fn extract_with_claude(
236 binary: &Path,
237 file_content: &[u8],
238 model: Option<&str>,
239 timeout_secs: u64,
240) -> Result<(ExtractionResult, f64), AppError> {
241 use wait_timeout::ChildExt;
242
243 let mut cmd = Command::new(binary);
244
245 cmd.env_clear();
246 for var in &[
247 "PATH",
248 "HOME",
249 "USER",
250 "SHELL",
251 "TERM",
252 "LANG",
253 "XDG_CONFIG_HOME",
254 "XDG_DATA_HOME",
255 "XDG_RUNTIME_DIR",
256 "ANTHROPIC_API_KEY",
257 "CLAUDE_CONFIG_DIR",
258 "TMPDIR",
259 "TMP",
260 "TEMP",
261 "DYLD_FALLBACK_LIBRARY_PATH",
262 ] {
263 if let Ok(val) = std::env::var(var) {
264 cmd.env(var, val);
265 }
266 }
267
268 cmd.arg("-p")
269 .arg(EXTRACTION_PROMPT)
270 .arg("--output-format")
271 .arg("json")
272 .arg("--json-schema")
273 .arg(EXTRACTION_SCHEMA)
274 .arg("--max-turns")
275 .arg("3")
276 .arg("--no-session-persistence");
277
278 if std::env::var("ANTHROPIC_API_KEY").is_ok() {
279 cmd.arg("--bare");
280 } else {
281 cmd.arg("--dangerously-skip-permissions");
282 }
283
284 if let Some(m) = model {
285 cmd.arg("--model").arg(m);
286 }
287
288 cmd.stdin(Stdio::piped())
289 .stdout(Stdio::piped())
290 .stderr(Stdio::piped());
291
292 let mut child = cmd.spawn().map_err(|e| {
293 AppError::Io(std::io::Error::new(
294 e.kind(),
295 format!("failed to spawn claude: {e}"),
296 ))
297 })?;
298
299 let stdin_data = file_content.to_vec();
300 let mut child_stdin = child
301 .stdin
302 .take()
303 .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
304 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
305 child_stdin.write_all(&stdin_data)?;
306 Ok(())
307 });
308
309 let timeout = std::time::Duration::from_secs(timeout_secs);
310 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
311
312 match status {
313 Some(exit_status) => {
314 stdin_thread
315 .join()
316 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
317 .map_err(AppError::Io)?;
318
319 let mut stdout_buf = Vec::new();
320 let mut stderr_buf = Vec::new();
321 if let Some(mut out) = child.stdout.take() {
322 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
323 }
324 if let Some(mut err) = child.stderr.take() {
325 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
326 }
327
328 if !exit_status.success() {
329 let stdout_str = String::from_utf8_lossy(&stdout_buf);
330 if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
331 {
332 if let Some(re) = elements
333 .iter()
334 .find(|e| e.r#type.as_deref() == Some("result"))
335 {
336 if re.is_error {
337 let err_msg = re
338 .error
339 .as_deref()
340 .or(re.result.as_deref())
341 .unwrap_or("unknown error");
342 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
343 return Err(AppError::Validation(format!(
344 "RATE_LIMITED: {err_msg}"
345 )));
346 }
347 return Err(AppError::Validation(format!(
348 "claude -p failed: {err_msg}"
349 )));
350 }
351 }
352 }
353 let stderr_str = String::from_utf8_lossy(&stderr_buf);
354 return Err(AppError::Validation(format!(
355 "claude -p exited with code {:?}: {}",
356 exit_status.code(),
357 stderr_str.trim()
358 )));
359 }
360
361 let stdout = String::from_utf8(stdout_buf)
362 .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
363 parse_claude_output(&stdout)
364 }
365 None => {
366 tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
367 let _ = child.kill();
368 let _ = child.wait();
369 let _ = stdin_thread.join();
370 Err(AppError::Validation(format!(
371 "claude -p timed out after {timeout_secs} seconds"
372 )))
373 }
374 }
375}
376
377fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64), AppError> {
379 let elements: Vec<ClaudeOutputElement> = serde_json::from_str(stdout).map_err(|e| {
380 AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
381 })?;
382
383 let result_elem = elements
384 .iter()
385 .find(|e| e.r#type.as_deref() == Some("result"))
386 .ok_or_else(|| {
387 AppError::Validation("claude output missing 'result' element".to_string())
388 })?;
389
390 if result_elem.is_error {
391 let err_msg = result_elem
392 .error
393 .as_deref()
394 .or(result_elem.result.as_deref())
395 .unwrap_or("unknown error");
396 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
397 return Err(AppError::Validation(format!("RATE_LIMITED: {err_msg}")));
398 }
399 return Err(AppError::Validation(format!(
400 "claude extraction failed: {err_msg}"
401 )));
402 }
403
404 let extraction = result_elem
405 .structured_output
406 .clone()
407 .or_else(|| {
408 result_elem
409 .result
410 .as_ref()
411 .and_then(|text| serde_json::from_str::<ExtractionResult>(text).ok())
412 })
413 .ok_or_else(|| {
414 AppError::Validation("claude result missing structured_output and result field".into())
415 })?;
416
417 let cost = result_elem.total_cost_usd.unwrap_or(0.0);
418
419 Ok((extraction, cost))
420}
421
422fn emit_json<T: Serialize>(value: &T) {
423 if let Ok(json) = serde_json::to_string(value) {
424 let stdout = std::io::stdout();
425 let mut lock = stdout.lock();
426 let _ = writeln!(lock, "{json}");
427 let _ = lock.flush();
428 }
429}
430
431fn collect_matching_files(
433 dir: &Path,
434 pattern: &str,
435 recursive: bool,
436 max_files: usize,
437) -> Result<Vec<PathBuf>, AppError> {
438 let mut files = Vec::new();
439 super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
440 files.sort();
441
442 if files.len() > max_files {
443 return Err(AppError::Validation(format!(
444 "found {} files, exceeds --max-files cap of {}",
445 files.len(),
446 max_files
447 )));
448 }
449
450 Ok(files)
451}
452
453fn open_queue_db(path: &str) -> Result<Connection, AppError> {
455 let conn = Connection::open(path)?;
456
457 conn.execute_batch(
458 "CREATE TABLE IF NOT EXISTS queue (
459 id INTEGER PRIMARY KEY AUTOINCREMENT,
460 file_path TEXT NOT NULL UNIQUE,
461 name TEXT,
462 status TEXT NOT NULL DEFAULT 'pending',
463 memory_id INTEGER,
464 entities INTEGER DEFAULT 0,
465 rels INTEGER DEFAULT 0,
466 error TEXT,
467 cost_usd REAL DEFAULT 0.0,
468 attempt INTEGER DEFAULT 0,
469 elapsed_ms INTEGER,
470 created_at TEXT DEFAULT (datetime('now')),
471 done_at TEXT
472 );
473 CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
474 )?;
475
476 Ok(conn)
477}
478
479pub fn run_claude_ingest(args: &IngestArgs) -> Result<(), AppError> {
481 let started = Instant::now();
482
483 if !args.dir.exists() {
484 return Err(AppError::Validation(format!(
485 "directory not found: {}",
486 args.dir.display()
487 )));
488 }
489
490 let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
492 let version = validate_claude_version(&claude_binary)?;
493 tracing::info!(
494 target: "ingest",
495 binary = %claude_binary.display(),
496 version = %version,
497 "Claude Code binary validated"
498 );
499
500 emit_json(&PhaseEvent {
501 phase: "validate",
502 claude_path: claude_binary.to_str(),
503 version: Some(&version),
504 dir: None,
505 files_total: None,
506 files_new: None,
507 files_existing: None,
508 });
509
510 let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
512
513 let queue_conn = open_queue_db(&args.queue_db)?;
514
515 if args.resume {
516 let reset = queue_conn
517 .execute(
518 "UPDATE queue SET status='pending' WHERE status='processing'",
519 [],
520 )
521 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
522 if reset > 0 {
523 tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
524 }
525 }
526
527 if args.retry_failed {
528 let count = queue_conn
529 .execute(
530 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
531 [],
532 )
533 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
534 tracing::info!(target: "ingest", count, "retrying failed files");
535 }
536
537 if !args.resume && !args.retry_failed {
538 queue_conn
539 .execute("DELETE FROM queue", [])
540 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
541 }
542
543 let mut new_count = 0usize;
544 let mut existing_count = 0usize;
545
546 if !args.retry_failed {
547 for file in &files {
548 let file_str = file.to_string_lossy().to_string();
549 let inserted = queue_conn
550 .execute(
551 "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
552 rusqlite::params![file_str],
553 )
554 .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
555 if inserted > 0 {
556 new_count += 1;
557 } else {
558 existing_count += 1;
559 }
560 }
561 }
562
563 emit_json(&PhaseEvent {
564 phase: "scan",
565 claude_path: None,
566 version: None,
567 dir: args.dir.to_str(),
568 files_total: Some(files.len()),
569 files_new: Some(new_count),
570 files_existing: Some(existing_count),
571 });
572
573 if args.dry_run {
574 for (idx, file) in files.iter().enumerate() {
575 let (name, _truncated, _orig) =
576 super::ingest::derive_kebab_name(file, args.max_name_length);
577 emit_json(&FileEvent {
578 file: &file.to_string_lossy(),
579 name: &name,
580 status: "preview",
581 memory_id: None,
582 entities: None,
583 rels: None,
584 cost_usd: None,
585 elapsed_ms: None,
586 error: None,
587 index: idx,
588 total: files.len(),
589 });
590 }
591 emit_json(&Summary {
592 summary: true,
593 files_total: files.len(),
594 completed: 0,
595 failed: 0,
596 skipped: 0,
597 entities_total: 0,
598 rels_total: 0,
599 cost_usd: 0.0,
600 elapsed_ms: started.elapsed().as_millis() as u64,
601 });
602 if !args.keep_queue {
603 let _ = std::fs::remove_file(&args.queue_db);
604 }
605 return Ok(());
606 }
607
608 let paths = AppPaths::resolve(args.db.as_deref())?;
610 ensure_db_ready(&paths)?;
611 let conn = open_rw(&paths.db)?;
612 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
613 let memory_type_str = args.r#type.as_str().to_string();
614
615 let mut completed = 0usize;
616 let mut failed = 0usize;
617 let skipped = 0usize;
618 let mut entities_total = 0usize;
619 let mut rels_total = 0usize;
620 let mut cost_total = 0.0f64;
621 let total = files.len();
622
623 let mut backoff_secs = args.rate_limit_wait;
624
625 loop {
626 let pending: Option<(i64, String)> = queue_conn
627 .query_row(
628 "UPDATE queue SET status='processing', attempt=attempt+1 \
629 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
630 RETURNING id, file_path",
631 [],
632 |row| Ok((row.get(0)?, row.get(1)?)),
633 )
634 .ok();
635
636 let (queue_id, file_path) = match pending {
637 Some(p) => p,
638 None => break,
639 };
640
641 let file_started = Instant::now();
642 let file_content = match std::fs::read(&file_path) {
643 Ok(c) => c,
644 Err(e) => {
645 let err_msg = format!("IO error: {e}");
646 let _ = queue_conn.execute(
647 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
648 rusqlite::params![err_msg, queue_id],
649 );
650 let current_index = completed + failed + skipped;
651 failed += 1;
652 emit_json(&FileEvent {
653 file: &file_path,
654 name: "",
655 status: "failed",
656 memory_id: None,
657 entities: None,
658 rels: None,
659 cost_usd: None,
660 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
661 error: Some(&err_msg),
662 index: current_index,
663 total,
664 });
665 if args.fail_fast {
666 break;
667 }
668 continue;
669 }
670 };
671
672 let max_extract_attempts: u32 = 2;
674 let mut extraction_result: Option<(ExtractionResult, f64)> = None;
675 let mut last_extract_err: Option<String> = None;
676
677 for attempt in 1..=max_extract_attempts {
678 match extract_with_claude(
679 &claude_binary,
680 &file_content,
681 args.claude_model.as_deref(),
682 args.claude_timeout,
683 ) {
684 Ok(result) => {
685 extraction_result = Some(result);
686 break;
687 }
688 Err(ref e) if format!("{e}").contains("RATE_LIMITED") => {
689 last_extract_err = Some(format!("{e}"));
690 break;
691 }
692 Err(e) => {
693 let msg = format!("{e}");
694 if attempt < max_extract_attempts {
695 tracing::warn!(target: "ingest", attempt, error = %msg, "extraction failed, retrying (cold-start workaround)");
696 std::thread::sleep(std::time::Duration::from_secs(2));
697 }
698 last_extract_err = Some(msg);
699 }
700 }
701 }
702
703 if let Some((extraction, cost)) = extraction_result {
704 backoff_secs = args.rate_limit_wait;
705
706 let name = &extraction.name;
707 let ent_count = extraction.entities.len();
708 let rel_count = extraction.relationships.len();
709
710 let new_entities: Vec<NewEntity> = extraction
711 .entities
712 .iter()
713 .filter_map(|e| match e.entity_type.parse::<EntityType>() {
714 Ok(et) => Some(NewEntity {
715 name: e.name.clone(),
716 entity_type: et,
717 description: None,
718 }),
719 Err(_) => {
720 tracing::warn!(
721 target: "ingest",
722 entity = %e.name,
723 entity_type = %e.entity_type,
724 "entity type not recognized, skipping"
725 );
726 None
727 }
728 })
729 .collect();
730
731 let new_relationships: Vec<NewRelationship> = extraction
732 .relationships
733 .iter()
734 .map(|r| NewRelationship {
735 source: r.source.clone(),
736 target: r.target.clone(),
737 relation: r.relation.clone(),
738 strength: r.strength,
739 description: None,
740 })
741 .collect();
742
743 let body_str = String::from_utf8_lossy(&file_content);
744 let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
745 let new_memory = NewMemory {
746 name: name.clone(),
747 namespace: namespace.clone(),
748 memory_type: memory_type_str.clone(),
749 description: extraction.description.clone(),
750 body: body_str.to_string(),
751 body_hash,
752 session_id: None,
753 source: "agent".to_string(),
754 metadata: serde_json::Value::Object(serde_json::Map::new()),
755 };
756
757 let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
759 Some((existing_id, is_deleted)) => {
760 if is_deleted {
761 memories::clear_deleted_at(&conn, existing_id)?;
762 }
763 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
764 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
765 rusqlite::params![existing_id],
766 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
767 )?;
768 memories::update(&conn, existing_id, &new_memory, None)?;
769 memories::sync_fts_after_update(
770 &conn,
771 existing_id,
772 &old_name,
773 &old_desc,
774 &old_body,
775 &new_memory.name,
776 &new_memory.description,
777 &new_memory.body,
778 )?;
779 tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
780 existing_id
781 }
782 None => match memories::insert(&conn, &new_memory) {
783 Ok(id) => id,
784 Err(e) => {
785 let err_msg = format!("{e}");
786 let _ = queue_conn.execute(
787 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
788 rusqlite::params![err_msg, queue_id],
789 );
790 let current_index = completed + failed + skipped;
791 failed += 1;
792 emit_json(&FileEvent {
793 file: &file_path,
794 name,
795 status: "failed",
796 memory_id: None,
797 entities: None,
798 rels: None,
799 cost_usd: Some(cost),
800 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
801 error: Some(&err_msg),
802 index: current_index,
803 total,
804 });
805 cost_total += cost;
806 if args.fail_fast {
807 break;
808 }
809 continue;
810 }
811 },
812 };
813
814 for ent in &new_entities {
815 if let Ok(eid) = entities::upsert_entity(&conn, &namespace, ent) {
816 let _ = entities::link_memory_entity(&conn, memory_id, eid);
817 }
818 }
819 for rel in &new_relationships {
820 crate::parsers::warn_if_non_canonical(&rel.relation);
821 let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
822 let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
823 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
824 let _ = conn.execute(
825 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
826 rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
827 );
828 }
829 }
830
831 let _ = queue_conn.execute(
832 "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
833 rusqlite::params![
834 name,
835 memory_id,
836 ent_count,
837 rel_count,
838 cost,
839 file_started.elapsed().as_millis() as i64,
840 queue_id
841 ],
842 );
843
844 let current_index = completed + failed + skipped;
845 completed += 1;
846 entities_total += ent_count;
847 rels_total += rel_count;
848 cost_total += cost;
849
850 emit_json(&FileEvent {
851 file: &file_path,
852 name,
853 status: "done",
854 memory_id: Some(memory_id),
855 entities: Some(ent_count),
856 rels: Some(rel_count),
857 cost_usd: Some(cost),
858 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
859 error: None,
860 index: current_index,
861 total,
862 });
863 } else if let Some(ref err_str) = last_extract_err {
864 if err_str.contains("RATE_LIMITED") {
865 tracing::warn!(
866 target: "ingest",
867 wait_seconds = backoff_secs,
868 "rate limited, waiting before retry"
869 );
870 let _ = queue_conn.execute(
871 "UPDATE queue SET status='pending' WHERE id=?1",
872 rusqlite::params![queue_id],
873 );
874 std::thread::sleep(std::time::Duration::from_secs(backoff_secs));
875 backoff_secs = (backoff_secs * 2).min(900);
876 continue;
877 } else {
878 let _ = queue_conn.execute(
879 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
880 rusqlite::params![err_str, queue_id],
881 );
882 let current_index = completed + failed + skipped;
883 failed += 1;
884 emit_json(&FileEvent {
885 file: &file_path,
886 name: "",
887 status: "failed",
888 memory_id: None,
889 entities: None,
890 rels: None,
891 cost_usd: None,
892 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
893 error: Some(err_str),
894 index: current_index,
895 total,
896 });
897 if args.fail_fast {
898 break;
899 }
900 }
901 }
902
903 if let Some(budget) = args.max_cost_usd {
904 if cost_total >= budget {
905 tracing::warn!(
906 target: "ingest",
907 spent = cost_total,
908 budget = budget,
909 "budget exceeded, stopping"
910 );
911 break;
912 }
913 }
914 }
915
916 emit_json(&Summary {
918 summary: true,
919 files_total: total,
920 completed,
921 failed,
922 skipped,
923 entities_total,
924 rels_total,
925 cost_usd: cost_total,
926 elapsed_ms: started.elapsed().as_millis() as u64,
927 });
928
929 if !args.keep_queue && failed == 0 {
930 let _ = std::fs::remove_file(&args.queue_db);
931 }
932
933 Ok(())
934}
935
936#[cfg(test)]
937mod tests {
938 use super::*;
939
940 #[test]
941 fn test_extraction_schema_valid_json() {
942 let _: serde_json::Value =
943 serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
944 }
945
946 #[test]
947 fn test_parse_claude_output_valid() {
948 let output = r#"[
949 {"type":"system","subtype":"init"},
950 {"type":"assistant"},
951 {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
952 ]"#;
953 let (result, cost) = parse_claude_output(output).expect("parse must succeed");
954 assert_eq!(result.name, "test-doc");
955 assert_eq!(result.entities.len(), 1);
956 assert_eq!(result.relationships.len(), 1);
957 assert!((cost - 0.02).abs() < f64::EPSILON);
958 }
959
960 #[test]
961 fn test_parse_claude_output_error() {
962 let output = r#"[
963 {"type":"system","subtype":"init"},
964 {"type":"result","is_error":true,"error":"authentication failed"}
965 ]"#;
966 let err = parse_claude_output(output).unwrap_err();
967 assert!(format!("{err}").contains("authentication failed"));
968 }
969
970 #[test]
971 fn test_parse_claude_output_rate_limit() {
972 let output = r#"[
973 {"type":"system","subtype":"init"},
974 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
975 ]"#;
976 let err = parse_claude_output(output).unwrap_err();
977 assert!(format!("{err}").contains("RATE_LIMITED"));
978 }
979
980 #[test]
981 fn test_parse_claude_output_malformed() {
982 let output = "not json at all";
983 assert!(parse_claude_output(output).is_err());
984 }
985
986 #[test]
987 fn test_find_claude_binary_not_found() {
988 let original_path = std::env::var_os("PATH");
989 std::env::set_var("PATH", "/nonexistent");
990 std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
991 let result = find_claude_binary(None);
992 if let Some(p) = original_path {
993 std::env::set_var("PATH", p);
994 }
995 assert!(result.is_err());
996 }
997
998 #[test]
999 fn test_parse_claude_output_result_fallback() {
1000 let output = r#"[
1001 {"type":"system","subtype":"init"},
1002 {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1003 ]"#;
1004 let (result, cost) = parse_claude_output(output).expect("result fallback must work");
1005 assert_eq!(result.name, "test-fallback");
1006 assert_eq!(result.entities.len(), 1);
1007 assert!(result.relationships.is_empty());
1008 assert!((cost - 0.01).abs() < f64::EPSILON);
1009 }
1010
1011 #[test]
1012 fn test_parse_claude_output_error_with_result_field() {
1013 let output = r#"[
1014 {"type":"system","subtype":"init"},
1015 {"type":"result","is_error":true,"result":"Not logged in · Please run /login"}
1016 ]"#;
1017 let err = parse_claude_output(output).unwrap_err();
1018 let msg = format!("{err}");
1019 assert!(
1020 msg.contains("Not logged in"),
1021 "expected 'Not logged in' in: {msg}"
1022 );
1023 }
1024
1025 #[test]
1026 fn test_extraction_schema_entity_types_match_enum() {
1027 let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1028 let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1029 .as_array()
1030 .expect("schema must have entity_type enum");
1031 for t in types {
1032 let s = t.as_str().unwrap();
1033 assert!(
1034 s.parse::<EntityType>().is_ok(),
1035 "schema entity_type '{s}' not in EntityType enum"
1036 );
1037 }
1038 }
1039}