1use crate::commands::ingest::IngestArgs;
12use crate::entity_type::EntityType;
13use crate::errors::AppError;
14use crate::paths::AppPaths;
15use crate::storage::connection::{ensure_db_ready, open_rw};
16use crate::storage::entities::{self, NewEntity, NewRelationship};
17use crate::storage::memories::{self, NewMemory};
18
19use rusqlite::Connection;
20use serde::{Deserialize, Serialize};
21use std::io::Write;
22use std::path::{Path, PathBuf};
23use std::process::{Command, Stdio};
24use std::time::Instant;
25
26#[allow(dead_code)]
27const MIN_CLAUDE_VERSION: &str = "2.1.0";
28
29const EXTRACTION_SCHEMA: &str = r#"{
30 "type": "object",
31 "properties": {
32 "name": { "type": "string" },
33 "description": { "type": "string" },
34 "entities": {
35 "type": "array",
36 "items": {
37 "type": "object",
38 "properties": {
39 "name": { "type": "string" },
40 "entity_type": {
41 "type": "string",
42 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
43 }
44 },
45 "required": ["name", "entity_type"]
46 }
47 },
48 "relationships": {
49 "type": "array",
50 "items": {
51 "type": "object",
52 "properties": {
53 "source": { "type": "string" },
54 "target": { "type": "string" },
55 "relation": {
56 "type": "string",
57 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
58 },
59 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
60 },
61 "required": ["source","target","relation","strength"]
62 }
63 }
64 },
65 "required": ["name","description","entities","relationships"]
66}"#;
67
68const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
691. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
702. A one-sentence description (10-20 words) summarizing the key insight\n\
713. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
724. Typed relationships between entities with strength scores\n\n\
73Rules:\n\
74- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
75- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
76- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
77- NEVER use 'mentions' as relationship type\n\
78- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
79- Prefer fewer high-quality entities over many low-quality ones\n\
80- Description must answer: What is this about and WHY does it matter?";
81
82#[derive(Debug, Deserialize)]
83struct ClaudeOutputElement {
84 r#type: Option<String>,
85 #[allow(dead_code)]
86 subtype: Option<String>,
87 #[serde(default)]
88 is_error: bool,
89 structured_output: Option<ExtractionResult>,
90 total_cost_usd: Option<f64>,
91 error: Option<String>,
92}
93
94#[derive(Debug, Clone, Deserialize, Serialize)]
95pub struct ExtractionResult {
96 pub name: String,
97 pub description: String,
98 pub entities: Vec<ExtractedEntity>,
99 pub relationships: Vec<ExtractedRelationship>,
100}
101
102#[derive(Debug, Clone, Deserialize, Serialize)]
103pub struct ExtractedEntity {
104 pub name: String,
105 pub entity_type: String,
106}
107
108#[derive(Debug, Clone, Deserialize, Serialize)]
109pub struct ExtractedRelationship {
110 pub source: String,
111 pub target: String,
112 pub relation: String,
113 pub strength: f64,
114}
115
116#[derive(Debug, Serialize)]
117struct PhaseEvent<'a> {
118 phase: &'a str,
119 #[serde(skip_serializing_if = "Option::is_none")]
120 claude_path: Option<&'a str>,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 version: Option<&'a str>,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 dir: Option<&'a str>,
125 #[serde(skip_serializing_if = "Option::is_none")]
126 files_total: Option<usize>,
127 #[serde(skip_serializing_if = "Option::is_none")]
128 files_new: Option<usize>,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 files_existing: Option<usize>,
131}
132
133#[derive(Debug, Serialize)]
134struct FileEvent<'a> {
135 file: &'a str,
136 name: &'a str,
137 status: &'a str,
138 #[serde(skip_serializing_if = "Option::is_none")]
139 memory_id: Option<i64>,
140 #[serde(skip_serializing_if = "Option::is_none")]
141 entities: Option<usize>,
142 #[serde(skip_serializing_if = "Option::is_none")]
143 rels: Option<usize>,
144 #[serde(skip_serializing_if = "Option::is_none")]
145 cost_usd: Option<f64>,
146 #[serde(skip_serializing_if = "Option::is_none")]
147 elapsed_ms: Option<u64>,
148 #[serde(skip_serializing_if = "Option::is_none")]
149 error: Option<&'a str>,
150 index: usize,
151 total: usize,
152}
153
154#[derive(Debug, Serialize)]
155struct Summary {
156 summary: bool,
157 files_total: usize,
158 completed: usize,
159 failed: usize,
160 skipped: usize,
161 entities_total: usize,
162 rels_total: usize,
163 cost_usd: f64,
164 elapsed_ms: u64,
165}
166
167pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
169 if let Some(p) = explicit {
170 if p.exists() {
171 return Ok(p.to_path_buf());
172 }
173 return Err(AppError::Validation(format!(
174 "Claude Code binary not found at explicit path: {}",
175 p.display()
176 )));
177 }
178
179 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
180 let p = PathBuf::from(&env_path);
181 if p.exists() {
182 return Ok(p);
183 }
184 }
185
186 let name = if cfg!(windows) {
187 "claude.exe"
188 } else {
189 "claude"
190 };
191 if let Some(path_var) = std::env::var_os("PATH") {
192 for dir in std::env::split_paths(&path_var) {
193 let candidate = dir.join(name);
194 if candidate.exists() {
195 return Ok(candidate);
196 }
197 }
198 }
199
200 Err(AppError::Validation(
201 "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
202 ))
203}
204
205fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
207 let output = Command::new(binary)
208 .arg("--version")
209 .stdin(Stdio::null())
210 .stdout(Stdio::piped())
211 .stderr(Stdio::piped())
212 .output()
213 .map_err(AppError::Io)?;
214
215 if !output.status.success() {
216 return Err(AppError::Validation(
217 "failed to run 'claude --version'".to_string(),
218 ));
219 }
220
221 let version_str = String::from_utf8(output.stdout)
222 .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
223 let version = version_str.trim().to_string();
224
225 Ok(version)
226}
227
228fn extract_with_claude(
230 binary: &Path,
231 file_content: &[u8],
232 model: Option<&str>,
233) -> Result<(ExtractionResult, f64), AppError> {
234 let mut cmd = Command::new(binary);
235 cmd.arg("-p")
236 .arg(EXTRACTION_PROMPT)
237 .arg("--output-format")
238 .arg("json")
239 .arg("--json-schema")
240 .arg(EXTRACTION_SCHEMA)
241 .arg("--max-turns")
242 .arg("1")
243 .arg("--no-session-persistence")
244 .arg("--bare")
245 .stdin(Stdio::piped())
246 .stdout(Stdio::piped())
247 .stderr(Stdio::piped());
248
249 if let Some(m) = model {
250 cmd.arg("--model").arg(m);
251 }
252
253 let mut child = cmd.spawn().map_err(|e| {
254 AppError::Io(std::io::Error::new(
255 e.kind(),
256 format!("failed to spawn claude: {e}"),
257 ))
258 })?;
259
260 if let Some(mut stdin) = child.stdin.take() {
261 stdin.write_all(file_content).map_err(AppError::Io)?;
262 }
263
264 let output = child.wait_with_output().map_err(AppError::Io)?;
265
266 if !output.status.success() {
267 let stderr = String::from_utf8_lossy(&output.stderr);
268 return Err(AppError::Validation(format!(
269 "claude -p exited with code {:?}: {}",
270 output.status.code(),
271 stderr.trim()
272 )));
273 }
274
275 let stdout = String::from_utf8(output.stdout)
276 .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".to_string()))?;
277
278 parse_claude_output(&stdout)
279}
280
281fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64), AppError> {
283 let elements: Vec<ClaudeOutputElement> = serde_json::from_str(stdout).map_err(|e| {
284 AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
285 })?;
286
287 let result_elem = elements
288 .iter()
289 .find(|e| e.r#type.as_deref() == Some("result"))
290 .ok_or_else(|| {
291 AppError::Validation("claude output missing 'result' element".to_string())
292 })?;
293
294 if result_elem.is_error {
295 let err_msg = result_elem.error.as_deref().unwrap_or("unknown error");
296 if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
297 return Err(AppError::Validation(format!("RATE_LIMITED: {err_msg}")));
298 }
299 return Err(AppError::Validation(format!(
300 "claude extraction failed: {err_msg}"
301 )));
302 }
303
304 let extraction = result_elem.structured_output.clone().ok_or_else(|| {
305 AppError::Validation("claude result missing structured_output".to_string())
306 })?;
307
308 let cost = result_elem.total_cost_usd.unwrap_or(0.0);
309
310 Ok((extraction, cost))
311}
312
313fn emit_json<T: Serialize>(value: &T) {
314 if let Ok(json) = serde_json::to_string(value) {
315 let stdout = std::io::stdout();
316 let mut lock = stdout.lock();
317 let _ = writeln!(lock, "{json}");
318 let _ = lock.flush();
319 }
320}
321
322fn collect_matching_files(
324 dir: &Path,
325 pattern: &str,
326 recursive: bool,
327 max_files: usize,
328) -> Result<Vec<PathBuf>, AppError> {
329 let mut files = Vec::new();
330 super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
331 files.sort();
332
333 if files.len() > max_files {
334 return Err(AppError::Validation(format!(
335 "found {} files, exceeds --max-files cap of {}",
336 files.len(),
337 max_files
338 )));
339 }
340
341 Ok(files)
342}
343
344fn open_queue_db(path: &str) -> Result<Connection, AppError> {
346 let conn = Connection::open(path)?;
347
348 conn.execute_batch(
349 "CREATE TABLE IF NOT EXISTS queue (
350 id INTEGER PRIMARY KEY AUTOINCREMENT,
351 file_path TEXT NOT NULL UNIQUE,
352 name TEXT,
353 status TEXT NOT NULL DEFAULT 'pending',
354 memory_id INTEGER,
355 entities INTEGER DEFAULT 0,
356 rels INTEGER DEFAULT 0,
357 error TEXT,
358 cost_usd REAL DEFAULT 0.0,
359 attempt INTEGER DEFAULT 0,
360 elapsed_ms INTEGER,
361 created_at TEXT DEFAULT (datetime('now')),
362 done_at TEXT
363 );
364 CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
365 )?;
366
367 Ok(conn)
368}
369
370pub fn run_claude_ingest(args: &IngestArgs) -> Result<(), AppError> {
372 let started = Instant::now();
373
374 if !args.dir.exists() {
375 return Err(AppError::Validation(format!(
376 "directory not found: {}",
377 args.dir.display()
378 )));
379 }
380
381 let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
383 let version = validate_claude_version(&claude_binary)?;
384 tracing::info!(
385 target: "ingest",
386 binary = %claude_binary.display(),
387 version = %version,
388 "Claude Code binary validated"
389 );
390
391 emit_json(&PhaseEvent {
392 phase: "validate",
393 claude_path: claude_binary.to_str(),
394 version: Some(&version),
395 dir: None,
396 files_total: None,
397 files_new: None,
398 files_existing: None,
399 });
400
401 let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
403
404 let queue_conn = open_queue_db(&args.queue_db)?;
405
406 let mut new_count = 0usize;
407 let mut existing_count = 0usize;
408
409 for file in &files {
410 let file_str = file.to_string_lossy().to_string();
411 let inserted = queue_conn
412 .execute(
413 "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
414 rusqlite::params![file_str],
415 )
416 .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
417 if inserted > 0 {
418 new_count += 1;
419 } else {
420 existing_count += 1;
421 }
422 }
423
424 emit_json(&PhaseEvent {
425 phase: "scan",
426 claude_path: None,
427 version: None,
428 dir: args.dir.to_str(),
429 files_total: Some(files.len()),
430 files_new: Some(new_count),
431 files_existing: Some(existing_count),
432 });
433
434 let paths = AppPaths::resolve(args.db.as_deref())?;
436 ensure_db_ready(&paths)?;
437 let conn = open_rw(&paths.db)?;
438 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
439 let memory_type_str = args.r#type.as_str().to_string();
440
441 let mut completed = 0usize;
442 let mut failed = 0usize;
443 let skipped = 0usize;
444 let mut entities_total = 0usize;
445 let mut rels_total = 0usize;
446 let mut cost_total = 0.0f64;
447 let total = files.len();
448
449 let mut backoff_secs = args.rate_limit_wait;
450
451 loop {
452 let pending: Option<(i64, String)> = queue_conn
453 .query_row(
454 "UPDATE queue SET status='processing', attempt=attempt+1 \
455 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
456 RETURNING id, file_path",
457 [],
458 |row| Ok((row.get(0)?, row.get(1)?)),
459 )
460 .ok();
461
462 let (queue_id, file_path) = match pending {
463 Some(p) => p,
464 None => break,
465 };
466
467 let file_started = Instant::now();
468 let file_content = match std::fs::read(&file_path) {
469 Ok(c) => c,
470 Err(e) => {
471 let err_msg = format!("IO error: {e}");
472 let _ = queue_conn.execute(
473 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
474 rusqlite::params![err_msg, queue_id],
475 );
476 failed += 1;
477 emit_json(&FileEvent {
478 file: &file_path,
479 name: "",
480 status: "failed",
481 memory_id: None,
482 entities: None,
483 rels: None,
484 cost_usd: None,
485 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
486 error: Some(&err_msg),
487 index: completed + failed + skipped,
488 total,
489 });
490 if args.fail_fast {
491 break;
492 }
493 continue;
494 }
495 };
496
497 match extract_with_claude(&claude_binary, &file_content, args.claude_model.as_deref()) {
498 Ok((extraction, cost)) => {
499 backoff_secs = args.rate_limit_wait;
500
501 let name = &extraction.name;
502 let ent_count = extraction.entities.len();
503 let rel_count = extraction.relationships.len();
504
505 let new_entities: Vec<NewEntity> = extraction
506 .entities
507 .iter()
508 .filter_map(|e| {
509 e.entity_type
510 .parse::<EntityType>()
511 .ok()
512 .map(|et| NewEntity {
513 name: e.name.clone(),
514 entity_type: et,
515 description: None,
516 })
517 })
518 .collect();
519
520 let new_relationships: Vec<NewRelationship> = extraction
521 .relationships
522 .iter()
523 .map(|r| NewRelationship {
524 source: r.source.clone(),
525 target: r.target.clone(),
526 relation: r.relation.clone(),
527 strength: r.strength,
528 description: None,
529 })
530 .collect();
531
532 let body_str = String::from_utf8_lossy(&file_content);
533 let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
534 let new_memory = NewMemory {
535 name: name.clone(),
536 namespace: namespace.clone(),
537 memory_type: memory_type_str.clone(),
538 description: extraction.description.clone(),
539 body: body_str.to_string(),
540 body_hash,
541 session_id: None,
542 source: "claude-code".to_string(),
543 metadata: serde_json::Value::Object(serde_json::Map::new()),
544 };
545
546 let memory_id = match memories::insert(&conn, &new_memory) {
547 Ok(id) => id,
548 Err(e) => {
549 let err_msg = format!("{e}");
550 let _ = queue_conn.execute(
551 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
552 rusqlite::params![err_msg, queue_id],
553 );
554 failed += 1;
555 emit_json(&FileEvent {
556 file: &file_path,
557 name,
558 status: "failed",
559 memory_id: None,
560 entities: None,
561 rels: None,
562 cost_usd: Some(cost),
563 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
564 error: Some(&err_msg),
565 index: completed + failed + skipped,
566 total,
567 });
568 cost_total += cost;
569 if args.fail_fast {
570 break;
571 }
572 continue;
573 }
574 };
575
576 for ent in &new_entities {
577 if let Ok(eid) = entities::upsert_entity(&conn, &namespace, ent) {
578 let _ = entities::link_memory_entity(&conn, memory_id, eid);
579 }
580 }
581 for rel in &new_relationships {
582 let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
583 let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
584 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
585 let _ = conn.execute(
586 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
587 rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
588 );
589 }
590 }
591
592 let _ = queue_conn.execute(
593 "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
594 rusqlite::params![
595 name,
596 memory_id,
597 ent_count,
598 rel_count,
599 cost,
600 file_started.elapsed().as_millis() as i64,
601 queue_id
602 ],
603 );
604
605 completed += 1;
606 entities_total += ent_count;
607 rels_total += rel_count;
608 cost_total += cost;
609
610 emit_json(&FileEvent {
611 file: &file_path,
612 name,
613 status: "done",
614 memory_id: Some(memory_id),
615 entities: Some(ent_count),
616 rels: Some(rel_count),
617 cost_usd: Some(cost),
618 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
619 error: None,
620 index: completed + failed + skipped - 1,
621 total,
622 });
623 }
624 Err(ref e) if format!("{e}").contains("RATE_LIMITED") => {
625 tracing::warn!(
626 target: "ingest",
627 wait_seconds = backoff_secs,
628 "rate limited, waiting before retry"
629 );
630 let _ = queue_conn.execute(
631 "UPDATE queue SET status='pending' WHERE id=?1",
632 rusqlite::params![queue_id],
633 );
634 std::thread::sleep(std::time::Duration::from_secs(backoff_secs));
635 backoff_secs = (backoff_secs * 2).min(900);
636 continue;
637 }
638 Err(e) => {
639 let err_msg = format!("{e}");
640 let _ = queue_conn.execute(
641 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
642 rusqlite::params![err_msg, queue_id],
643 );
644 failed += 1;
645 emit_json(&FileEvent {
646 file: &file_path,
647 name: "",
648 status: "failed",
649 memory_id: None,
650 entities: None,
651 rels: None,
652 cost_usd: None,
653 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
654 error: Some(&err_msg),
655 index: completed + failed + skipped,
656 total,
657 });
658 if args.fail_fast {
659 break;
660 }
661 }
662 }
663
664 if let Some(budget) = args.max_cost_usd {
665 if cost_total >= budget {
666 tracing::warn!(
667 target: "ingest",
668 spent = cost_total,
669 budget = budget,
670 "budget exceeded, stopping"
671 );
672 break;
673 }
674 }
675 }
676
677 emit_json(&Summary {
679 summary: true,
680 files_total: total,
681 completed,
682 failed,
683 skipped,
684 entities_total,
685 rels_total,
686 cost_usd: cost_total,
687 elapsed_ms: started.elapsed().as_millis() as u64,
688 });
689
690 if !args.keep_queue && failed == 0 {
691 let _ = std::fs::remove_file(&args.queue_db);
692 }
693
694 Ok(())
695}
696
697#[cfg(test)]
698mod tests {
699 use super::*;
700
701 #[test]
702 fn test_extraction_schema_valid_json() {
703 let _: serde_json::Value =
704 serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
705 }
706
707 #[test]
708 fn test_parse_claude_output_valid() {
709 let output = r#"[
710 {"type":"system","subtype":"init"},
711 {"type":"assistant"},
712 {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
713 ]"#;
714 let (result, cost) = parse_claude_output(output).expect("parse must succeed");
715 assert_eq!(result.name, "test-doc");
716 assert_eq!(result.entities.len(), 1);
717 assert_eq!(result.relationships.len(), 1);
718 assert!((cost - 0.02).abs() < f64::EPSILON);
719 }
720
721 #[test]
722 fn test_parse_claude_output_error() {
723 let output = r#"[
724 {"type":"system","subtype":"init"},
725 {"type":"result","is_error":true,"error":"authentication failed"}
726 ]"#;
727 let err = parse_claude_output(output).unwrap_err();
728 assert!(format!("{err}").contains("authentication failed"));
729 }
730
731 #[test]
732 fn test_parse_claude_output_rate_limit() {
733 let output = r#"[
734 {"type":"system","subtype":"init"},
735 {"type":"result","is_error":true,"error":"rate_limit exceeded"}
736 ]"#;
737 let err = parse_claude_output(output).unwrap_err();
738 assert!(format!("{err}").contains("RATE_LIMITED"));
739 }
740
741 #[test]
742 fn test_parse_claude_output_malformed() {
743 let output = "not json at all";
744 assert!(parse_claude_output(output).is_err());
745 }
746
747 #[test]
748 fn test_find_claude_binary_not_found() {
749 let original_path = std::env::var_os("PATH");
750 std::env::set_var("PATH", "/nonexistent");
751 std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
752 let result = find_claude_binary(None);
753 if let Some(p) = original_path {
754 std::env::set_var("PATH", p);
755 }
756 assert!(result.is_err());
757 }
758}