Skip to main content

sqlite_graphrag/commands/
ingest_claude.rs

1//! Handler for `ingest --mode claude-code`.
2//!
3//! Orchestrates the locally installed Claude Code CLI binary (`claude -p`)
4//! to extract domain-specific entities and relationships from each file,
5//! then persists them via the same pipeline as `remember --graph-stdin`.
6//!
7//! Architecture: P1 One-Shot per file โ€” each file spawns a separate
8//! `claude -p` process with `--json-schema` for guaranteed structured output.
9//! A SQLite queue DB tracks progress for resume/retry support.
10
11use crate::commands::ingest::IngestArgs;
12use crate::entity_type::EntityType;
13use crate::errors::AppError;
14use crate::paths::AppPaths;
15use crate::storage::connection::{ensure_db_ready, open_rw};
16use crate::storage::entities::{self, NewEntity, NewRelationship};
17use crate::storage::memories::{self, NewMemory};
18
19use rusqlite::Connection;
20use serde::{Deserialize, Serialize};
21use std::io::Write;
22use std::path::{Path, PathBuf};
23use std::process::{Command, Stdio};
24use std::time::Instant;
25
26const MIN_CLAUDE_VERSION: &str = "2.1.0";
27
28const EXTRACTION_SCHEMA: &str = r#"{
29  "type": "object",
30  "properties": {
31    "name": { "type": "string" },
32    "description": { "type": "string" },
33    "entities": {
34      "type": "array",
35      "items": {
36        "type": "object",
37        "properties": {
38          "name": { "type": "string" },
39          "entity_type": {
40            "type": "string",
41            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
42          }
43        },
44        "required": ["name", "entity_type"],
45        "additionalProperties": false
46      }
47    },
48    "relationships": {
49      "type": "array",
50      "items": {
51        "type": "object",
52        "properties": {
53          "source": { "type": "string" },
54          "target": { "type": "string" },
55          "relation": {
56            "type": "string",
57            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
58          },
59          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
60        },
61        "required": ["source","target","relation","strength"],
62        "additionalProperties": false
63      }
64    }
65  },
66  "required": ["name","description","entities","relationships"],
67  "additionalProperties": false
68}"#;
69
70const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
711. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
722. A one-sentence description (10-20 words) summarizing the key insight\n\
733. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
744. Typed relationships between entities with strength scores\n\n\
75Rules:\n\
76- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
77- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
78- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
79- NEVER use 'mentions' as relationship type\n\
80- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
81- Prefer fewer high-quality entities over many low-quality ones\n\
82- Description must answer: What is this about and WHY does it matter?";
83
84#[derive(Debug, Deserialize)]
85struct ClaudeOutputElement {
86    r#type: Option<String>,
87    #[allow(dead_code)]
88    subtype: Option<String>,
89    #[serde(default)]
90    is_error: bool,
91    structured_output: Option<ExtractionResult>,
92    result: Option<String>,
93    total_cost_usd: Option<f64>,
94    error: Option<String>,
95}
96
97#[derive(Debug, Clone, Deserialize, Serialize)]
98pub struct ExtractionResult {
99    pub name: String,
100    pub description: String,
101    pub entities: Vec<ExtractedEntity>,
102    pub relationships: Vec<ExtractedRelationship>,
103}
104
105#[derive(Debug, Clone, Deserialize, Serialize)]
106pub struct ExtractedEntity {
107    pub name: String,
108    pub entity_type: String,
109}
110
111#[derive(Debug, Clone, Deserialize, Serialize)]
112pub struct ExtractedRelationship {
113    pub source: String,
114    pub target: String,
115    pub relation: String,
116    pub strength: f64,
117}
118
119#[derive(Debug, Serialize)]
120struct PhaseEvent<'a> {
121    phase: &'a str,
122    #[serde(skip_serializing_if = "Option::is_none")]
123    claude_path: Option<&'a str>,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    version: Option<&'a str>,
126    #[serde(skip_serializing_if = "Option::is_none")]
127    dir: Option<&'a str>,
128    #[serde(skip_serializing_if = "Option::is_none")]
129    files_total: Option<usize>,
130    #[serde(skip_serializing_if = "Option::is_none")]
131    files_new: Option<usize>,
132    #[serde(skip_serializing_if = "Option::is_none")]
133    files_existing: Option<usize>,
134}
135
136#[derive(Debug, Serialize)]
137struct FileEvent<'a> {
138    file: &'a str,
139    name: &'a str,
140    status: &'a str,
141    #[serde(skip_serializing_if = "Option::is_none")]
142    memory_id: Option<i64>,
143    #[serde(skip_serializing_if = "Option::is_none")]
144    entities: Option<usize>,
145    #[serde(skip_serializing_if = "Option::is_none")]
146    rels: Option<usize>,
147    #[serde(skip_serializing_if = "Option::is_none")]
148    cost_usd: Option<f64>,
149    #[serde(skip_serializing_if = "Option::is_none")]
150    elapsed_ms: Option<u64>,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    error: Option<&'a str>,
153    index: usize,
154    total: usize,
155}
156
157#[derive(Debug, Serialize)]
158struct Summary {
159    summary: bool,
160    files_total: usize,
161    completed: usize,
162    failed: usize,
163    skipped: usize,
164    entities_total: usize,
165    rels_total: usize,
166    cost_usd: f64,
167    elapsed_ms: u64,
168}
169
170/// Locates the Claude Code binary on the system.
171pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
172    if let Some(p) = explicit {
173        if p.exists() {
174            return Ok(p.to_path_buf());
175        }
176        return Err(AppError::Validation(format!(
177            "Claude Code binary not found at explicit path: {}",
178            p.display()
179        )));
180    }
181
182    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
183        let p = PathBuf::from(&env_path);
184        if p.exists() {
185            return Ok(p);
186        }
187    }
188
189    let name = if cfg!(windows) {
190        "claude.exe"
191    } else {
192        "claude"
193    };
194    if let Some(path_var) = std::env::var_os("PATH") {
195        for dir in std::env::split_paths(&path_var) {
196            let candidate = dir.join(name);
197            if candidate.exists() {
198                return Ok(candidate);
199            }
200        }
201    }
202
203    Err(AppError::Validation(
204        "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
205    ))
206}
207
208/// Validates that the Claude Code binary meets the minimum version.
209fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
210    let output = Command::new(binary)
211        .arg("--version")
212        .stdin(Stdio::null())
213        .stdout(Stdio::piped())
214        .stderr(Stdio::piped())
215        .output()
216        .map_err(AppError::Io)?;
217
218    if !output.status.success() {
219        return Err(AppError::Validation(
220            "failed to run 'claude --version'".to_string(),
221        ));
222    }
223
224    let version_str = String::from_utf8(output.stdout)
225        .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
226    let version = version_str.trim().to_string();
227
228    // Extract the numeric version part before first space or paren, e.g. "2.1.149 (Claude Code)" -> "2.1.149"
229    let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
230
231    fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
232        let parts: Vec<&str> = s.splitn(3, '.').collect();
233        if parts.len() < 2 {
234            return None;
235        }
236        let major = parts[0].parse::<u64>().ok()?;
237        let minor = parts[1].parse::<u64>().ok()?;
238        let patch = parts
239            .get(2)
240            .and_then(|p| p.parse::<u64>().ok())
241            .unwrap_or(0);
242        Some((major, minor, patch))
243    }
244
245    if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
246        if actual < min {
247            return Err(AppError::Validation(format!(
248                "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
249            )));
250        }
251    }
252
253    Ok(version)
254}
255
256/// Invokes `claude -p` for a single file and returns the extraction result.
257///
258/// Uses `wait-timeout` for cross-platform subprocess timeout, `env_clear()`
259/// for least-privilege environment, and `--bare` when `ANTHROPIC_API_KEY`
260/// is available (faster startup) vs `--dangerously-skip-permissions` for
261/// OAuth users.
262fn extract_with_claude(
263    binary: &Path,
264    file_content: &[u8],
265    model: Option<&str>,
266    timeout_secs: u64,
267) -> Result<(ExtractionResult, f64), AppError> {
268    use wait_timeout::ChildExt;
269
270    let mut cmd = Command::new(binary);
271
272    cmd.env_clear();
273    for var in &[
274        "PATH",
275        "HOME",
276        "USER",
277        "SHELL",
278        "TERM",
279        "LANG",
280        "XDG_CONFIG_HOME",
281        "XDG_DATA_HOME",
282        "XDG_RUNTIME_DIR",
283        "ANTHROPIC_API_KEY",
284        "CLAUDE_CONFIG_DIR",
285        "TMPDIR",
286        "TMP",
287        "TEMP",
288        "DYLD_FALLBACK_LIBRARY_PATH",
289    ] {
290        if let Ok(val) = std::env::var(var) {
291            cmd.env(var, val);
292        }
293    }
294
295    #[cfg(windows)]
296    for var in &[
297        "LOCALAPPDATA",
298        "APPDATA",
299        "USERPROFILE",
300        "SystemRoot",
301        "COMSPEC",
302        "PATHEXT",
303        "HOMEPATH",
304        "HOMEDRIVE",
305    ] {
306        if let Ok(val) = std::env::var(var) {
307            cmd.env(var, val);
308        }
309    }
310
311    cmd.arg("-p")
312        .arg(EXTRACTION_PROMPT)
313        .arg("--output-format")
314        .arg("json")
315        .arg("--json-schema")
316        .arg(EXTRACTION_SCHEMA)
317        .arg("--max-turns")
318        .arg("3")
319        .arg("--no-session-persistence");
320
321    if std::env::var("ANTHROPIC_API_KEY").is_ok() {
322        cmd.arg("--bare");
323    } else {
324        cmd.arg("--dangerously-skip-permissions");
325    }
326
327    if let Some(m) = model {
328        cmd.arg("--model").arg(m);
329    }
330
331    cmd.stdin(Stdio::piped())
332        .stdout(Stdio::piped())
333        .stderr(Stdio::piped());
334
335    let mut child = cmd.spawn().map_err(|e| {
336        AppError::Io(std::io::Error::new(
337            e.kind(),
338            format!("failed to spawn claude: {e}"),
339        ))
340    })?;
341
342    let stdin_data = file_content.to_vec();
343    let mut child_stdin = child
344        .stdin
345        .take()
346        .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
347    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
348        child_stdin.write_all(&stdin_data)?;
349        Ok(())
350    });
351
352    let timeout = std::time::Duration::from_secs(timeout_secs);
353    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
354
355    match status {
356        Some(exit_status) => {
357            stdin_thread
358                .join()
359                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
360                .map_err(AppError::Io)?;
361
362            let mut stdout_buf = Vec::new();
363            let mut stderr_buf = Vec::new();
364            if let Some(mut out) = child.stdout.take() {
365                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
366            }
367            if let Some(mut err) = child.stderr.take() {
368                std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
369            }
370
371            if !exit_status.success() {
372                let stdout_str = String::from_utf8_lossy(&stdout_buf);
373                if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
374                {
375                    if let Some(re) = elements
376                        .iter()
377                        .find(|e| e.r#type.as_deref() == Some("result"))
378                    {
379                        if re.is_error {
380                            let err_msg = re
381                                .error
382                                .as_deref()
383                                .or(re.result.as_deref())
384                                .unwrap_or("unknown error");
385                            if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
386                                return Err(AppError::Validation(format!(
387                                    "RATE_LIMITED: {err_msg}"
388                                )));
389                            }
390                            if err_msg.contains("Not logged in")
391                                || err_msg.contains("authentication")
392                            {
393                                tracing::warn!(
394                                    target: "ingest",
395                                    "Claude Code authentication failed. Re-authenticate interactively with: claude"
396                                );
397                            }
398                            return Err(AppError::Validation(format!(
399                                "claude -p failed: {err_msg}"
400                            )));
401                        }
402                    }
403                }
404                let stderr_str = String::from_utf8_lossy(&stderr_buf);
405                if stderr_str.contains("auth") || stderr_str.contains("login") {
406                    tracing::warn!(
407                        target: "ingest",
408                        "Claude Code authentication may have failed. Re-authenticate with: claude"
409                    );
410                }
411                return Err(AppError::Validation(format!(
412                    "claude -p exited with code {:?}: {}",
413                    exit_status.code(),
414                    stderr_str.trim()
415                )));
416            }
417
418            let stdout = String::from_utf8(stdout_buf)
419                .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
420            parse_claude_output(&stdout)
421        }
422        None => {
423            tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
424            let _ = child.kill();
425            let _ = child.wait();
426            let _ = stdin_thread.join();
427            Err(AppError::Validation(format!(
428                "claude -p timed out after {timeout_secs} seconds"
429            )))
430        }
431    }
432}
433
434/// Parses the JSON array output from `claude -p --output-format json`.
435fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64), AppError> {
436    let elements: Vec<ClaudeOutputElement> = serde_json::from_str(stdout).map_err(|e| {
437        AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
438    })?;
439
440    let result_elem = elements
441        .iter()
442        .find(|e| e.r#type.as_deref() == Some("result"))
443        .ok_or_else(|| {
444            AppError::Validation("claude output missing 'result' element".to_string())
445        })?;
446
447    if result_elem.is_error {
448        let err_msg = result_elem
449            .error
450            .as_deref()
451            .or(result_elem.result.as_deref())
452            .unwrap_or("unknown error");
453        if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
454            return Err(AppError::Validation(format!("RATE_LIMITED: {err_msg}")));
455        }
456        return Err(AppError::Validation(format!(
457            "claude extraction failed: {err_msg}"
458        )));
459    }
460
461    let extraction = result_elem
462        .structured_output
463        .clone()
464        .or_else(|| {
465            result_elem
466                .result
467                .as_ref()
468                .and_then(|text| serde_json::from_str::<ExtractionResult>(text).ok())
469        })
470        .ok_or_else(|| {
471            AppError::Validation("claude result missing structured_output and result field".into())
472        })?;
473
474    let cost = result_elem.total_cost_usd.unwrap_or(0.0);
475
476    Ok((extraction, cost))
477}
478
479fn emit_json<T: Serialize>(value: &T) {
480    if let Ok(json) = serde_json::to_string(value) {
481        let stdout = std::io::stdout();
482        let mut lock = stdout.lock();
483        let _ = writeln!(lock, "{json}");
484        let _ = lock.flush();
485    }
486}
487
488/// Collects files matching the pattern (reuses ingest logic).
489fn collect_matching_files(
490    dir: &Path,
491    pattern: &str,
492    recursive: bool,
493    max_files: usize,
494) -> Result<Vec<PathBuf>, AppError> {
495    let mut files = Vec::new();
496    super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
497    files.sort();
498
499    if files.len() > max_files {
500        return Err(AppError::Validation(format!(
501            "found {} files, exceeds --max-files cap of {}",
502            files.len(),
503            max_files
504        )));
505    }
506
507    Ok(files)
508}
509
510/// Opens or creates the queue database for tracking ingest progress.
511fn open_queue_db(path: &str) -> Result<Connection, AppError> {
512    let conn = Connection::open(path)?;
513
514    conn.pragma_update(None, "journal_mode", "wal")?;
515
516    conn.execute_batch(
517        "CREATE TABLE IF NOT EXISTS queue (
518            id          INTEGER PRIMARY KEY AUTOINCREMENT,
519            file_path   TEXT NOT NULL UNIQUE,
520            name        TEXT,
521            status      TEXT NOT NULL DEFAULT 'pending',
522            memory_id   INTEGER,
523            entities    INTEGER DEFAULT 0,
524            rels        INTEGER DEFAULT 0,
525            error       TEXT,
526            cost_usd    REAL DEFAULT 0.0,
527            attempt     INTEGER DEFAULT 0,
528            elapsed_ms  INTEGER,
529            created_at  TEXT DEFAULT (datetime('now')),
530            done_at     TEXT
531        );
532        CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
533    )?;
534
535    Ok(conn)
536}
537
538/// Main entry point for `ingest --mode claude-code`.
539pub fn run_claude_ingest(args: &IngestArgs) -> Result<(), AppError> {
540    let started = Instant::now();
541
542    if !args.dir.exists() {
543        return Err(AppError::Validation(format!(
544            "directory not found: {}",
545            args.dir.display()
546        )));
547    }
548
549    // Stage 1: Validate
550    let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
551    let version = validate_claude_version(&claude_binary)?;
552    tracing::info!(
553        target: "ingest",
554        binary = %claude_binary.display(),
555        version = %version,
556        "Claude Code binary validated"
557    );
558
559    emit_json(&PhaseEvent {
560        phase: "validate",
561        claude_path: claude_binary.to_str(),
562        version: Some(&version),
563        dir: None,
564        files_total: None,
565        files_new: None,
566        files_existing: None,
567    });
568
569    // Stage 2: Scan
570    let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
571
572    let queue_conn = open_queue_db(&args.queue_db)?;
573
574    if args.resume {
575        let reset = queue_conn
576            .execute(
577                "UPDATE queue SET status='pending' WHERE status='processing'",
578                [],
579            )
580            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
581        if reset > 0 {
582            tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
583        }
584    }
585
586    if args.retry_failed {
587        let count = queue_conn
588            .execute(
589                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
590                [],
591            )
592            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
593        tracing::info!(target: "ingest", count, "retrying failed files");
594    }
595
596    if !args.resume && !args.retry_failed {
597        queue_conn
598            .execute("DELETE FROM queue", [])
599            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
600    }
601
602    let mut new_count = 0usize;
603    let mut existing_count = 0usize;
604
605    if !args.retry_failed {
606        for file in &files {
607            let file_str = file.to_string_lossy().to_string();
608            let inserted = queue_conn
609                .execute(
610                    "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
611                    rusqlite::params![file_str],
612                )
613                .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
614            if inserted > 0 {
615                new_count += 1;
616            } else {
617                existing_count += 1;
618            }
619        }
620    }
621
622    emit_json(&PhaseEvent {
623        phase: "scan",
624        claude_path: None,
625        version: None,
626        dir: args.dir.to_str(),
627        files_total: Some(files.len()),
628        files_new: Some(new_count),
629        files_existing: Some(existing_count),
630    });
631
632    if args.dry_run {
633        for (idx, file) in files.iter().enumerate() {
634            let (name, _truncated, _orig) =
635                super::ingest::derive_kebab_name(file, args.max_name_length);
636            emit_json(&FileEvent {
637                file: &file.to_string_lossy(),
638                name: &name,
639                status: "preview",
640                memory_id: None,
641                entities: None,
642                rels: None,
643                cost_usd: None,
644                elapsed_ms: None,
645                error: None,
646                index: idx,
647                total: files.len(),
648            });
649        }
650        emit_json(&Summary {
651            summary: true,
652            files_total: files.len(),
653            completed: 0,
654            failed: 0,
655            skipped: 0,
656            entities_total: 0,
657            rels_total: 0,
658            cost_usd: 0.0,
659            elapsed_ms: started.elapsed().as_millis() as u64,
660        });
661        if !args.keep_queue {
662            let _ = std::fs::remove_file(&args.queue_db);
663        }
664        return Ok(());
665    }
666
667    // Stage 3: Process
668    let paths = AppPaths::resolve(args.db.as_deref())?;
669    ensure_db_ready(&paths)?;
670    let conn = open_rw(&paths.db)?;
671    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
672    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
673    let memory_type_str = args.r#type.as_str().to_string();
674
675    let mut completed = 0usize;
676    let mut failed = 0usize;
677    let skipped_initial: usize = queue_conn
678        .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
679            r.get::<_, usize>(0)
680        })
681        .unwrap_or(0);
682    let skipped = skipped_initial;
683    let mut entities_total = 0usize;
684    let mut rels_total = 0usize;
685    let mut cost_total = 0.0f64;
686    let total = files.len();
687
688    let mut backoff_secs = args.rate_limit_wait;
689
690    loop {
691        let pending: Option<(i64, String)> = queue_conn
692            .query_row(
693                "UPDATE queue SET status='processing', attempt=attempt+1 \
694                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
695                 RETURNING id, file_path",
696                [],
697                |row| Ok((row.get(0)?, row.get(1)?)),
698            )
699            .ok();
700
701        let (queue_id, file_path) = match pending {
702            Some(p) => p,
703            None => break,
704        };
705
706        let file_started = Instant::now();
707
708        // G05: reject files that exceed the 10 MB stdin limit
709        const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
710        if let Ok(meta) = std::fs::metadata(&file_path) {
711            if meta.len() > MAX_FILE_SIZE {
712                let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
713                let _ = queue_conn.execute(
714                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
715                    rusqlite::params![err_msg, queue_id],
716                );
717                let current_index = completed + failed + skipped;
718                failed += 1;
719                emit_json(&FileEvent {
720                    file: &file_path,
721                    name: "",
722                    status: "failed",
723                    memory_id: None,
724                    entities: None,
725                    rels: None,
726                    cost_usd: None,
727                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
728                    error: Some(&err_msg),
729                    index: current_index,
730                    total,
731                });
732                if args.fail_fast {
733                    break;
734                }
735                continue;
736            }
737        }
738
739        let file_content = match std::fs::read(&file_path) {
740            Ok(c) => c,
741            Err(e) => {
742                let err_msg = format!("IO error: {e}");
743                let _ = queue_conn.execute(
744                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
745                    rusqlite::params![err_msg, queue_id],
746                );
747                let current_index = completed + failed + skipped;
748                failed += 1;
749                emit_json(&FileEvent {
750                    file: &file_path,
751                    name: "",
752                    status: "failed",
753                    memory_id: None,
754                    entities: None,
755                    rels: None,
756                    cost_usd: None,
757                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
758                    error: Some(&err_msg),
759                    index: current_index,
760                    total,
761                });
762                if args.fail_fast {
763                    break;
764                }
765                continue;
766            }
767        };
768
769        // B07: retry once on cold-start failure (Claude Code Issue #23265)
770        let max_extract_attempts: u32 = 2;
771        let mut extraction_result: Option<(ExtractionResult, f64)> = None;
772        let mut last_extract_err: Option<String> = None;
773
774        for attempt in 1..=max_extract_attempts {
775            match extract_with_claude(
776                &claude_binary,
777                &file_content,
778                args.claude_model.as_deref(),
779                args.claude_timeout,
780            ) {
781                Ok(result) => {
782                    extraction_result = Some(result);
783                    break;
784                }
785                Err(ref e) if format!("{e}").contains("RATE_LIMITED") => {
786                    last_extract_err = Some(format!("{e}"));
787                    break;
788                }
789                Err(e) => {
790                    let msg = format!("{e}");
791                    if attempt < max_extract_attempts {
792                        tracing::warn!(target: "ingest", attempt, error = %msg, "extraction failed, retrying (cold-start workaround)");
793                        std::thread::sleep(std::time::Duration::from_secs(2));
794                    }
795                    last_extract_err = Some(msg);
796                }
797            }
798        }
799
800        if let Some((extraction, cost)) = extraction_result {
801            backoff_secs = args.rate_limit_wait;
802
803            let (normalized_name, _truncated, _orig) = crate::commands::ingest::derive_kebab_name(
804                std::path::Path::new(&extraction.name),
805                args.max_name_length,
806            );
807            let name = &normalized_name;
808            let ent_count = extraction.entities.len();
809            let rel_count = extraction.relationships.len();
810
811            let new_entities: Vec<NewEntity> = extraction
812                .entities
813                .iter()
814                .filter_map(|e| match e.entity_type.parse::<EntityType>() {
815                    Ok(et) => Some(NewEntity {
816                        name: e.name.clone(),
817                        entity_type: et,
818                        description: None,
819                    }),
820                    Err(_) => {
821                        tracing::warn!(
822                            target: "ingest",
823                            entity = %e.name,
824                            entity_type = %e.entity_type,
825                            "entity type not recognized, skipping"
826                        );
827                        None
828                    }
829                })
830                .collect();
831
832            let new_relationships: Vec<NewRelationship> = extraction
833                .relationships
834                .iter()
835                .map(|r| NewRelationship {
836                    source: r.source.clone(),
837                    target: r.target.clone(),
838                    relation: crate::parsers::normalize_relation(&r.relation),
839                    strength: r.strength,
840                    description: None,
841                })
842                .collect();
843
844            let body_str = String::from_utf8_lossy(&file_content);
845            let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
846            let new_memory = NewMemory {
847                name: name.clone(),
848                namespace: namespace.clone(),
849                memory_type: memory_type_str.clone(),
850                description: extraction.description.clone(),
851                body: body_str.to_string(),
852                body_hash,
853                session_id: None,
854                source: "agent".to_string(),
855                metadata: serde_json::Value::Object(serde_json::Map::new()),
856            };
857
858            // B06: deduplication โ€” update existing memory instead of failing on UNIQUE
859            let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
860                Some((existing_id, is_deleted)) => {
861                    if is_deleted {
862                        memories::clear_deleted_at(&conn, existing_id)?;
863                    }
864                    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
865                        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
866                        rusqlite::params![existing_id],
867                        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
868                    )?;
869                    memories::update(&conn, existing_id, &new_memory, None)?;
870                    memories::sync_fts_after_update(
871                        &conn,
872                        existing_id,
873                        &old_name,
874                        &old_desc,
875                        &old_body,
876                        &new_memory.name,
877                        &new_memory.description,
878                        &new_memory.body,
879                    )?;
880                    tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
881                    existing_id
882                }
883                None => match memories::insert(&conn, &new_memory) {
884                    Ok(id) => id,
885                    Err(e) => {
886                        let err_msg = format!("{e}");
887                        let _ = queue_conn.execute(
888                                "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
889                                rusqlite::params![err_msg, queue_id],
890                            );
891                        let current_index = completed + failed + skipped;
892                        failed += 1;
893                        emit_json(&FileEvent {
894                            file: &file_path,
895                            name,
896                            status: "failed",
897                            memory_id: None,
898                            entities: None,
899                            rels: None,
900                            cost_usd: Some(cost),
901                            elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
902                            error: Some(&err_msg),
903                            index: current_index,
904                            total,
905                        });
906                        cost_total += cost;
907                        if args.fail_fast {
908                            break;
909                        }
910                        continue;
911                    }
912                },
913            };
914
915            for ent in &new_entities {
916                match entities::upsert_entity(&conn, &namespace, ent) {
917                    Ok(eid) => {
918                        let _ = entities::link_memory_entity(&conn, memory_id, eid);
919                    }
920                    Err(e) => {
921                        tracing::warn!(
922                            target: "ingest",
923                            entity = %ent.name,
924                            error = %e,
925                            "entity skipped due to validation error"
926                        );
927                    }
928                }
929            }
930            for rel in &new_relationships {
931                crate::parsers::warn_if_non_canonical(&rel.relation);
932                let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
933                let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
934                if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
935                    let _ = conn.execute(
936                        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
937                        rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
938                    );
939                }
940            }
941
942            // G01: embedding pipeline โ€” enables recall to find memories created via --mode claude-code
943            let body_text = String::from_utf8_lossy(&file_content).into_owned();
944            let snippet: String = body_text.chars().take(200).collect();
945            let chunks_info =
946                crate::chunking::split_into_chunks_hierarchical(&body_text, tokenizer);
947
948            let embedding_result = if chunks_info.len() <= 1 {
949                crate::daemon::embed_passage_or_local(&paths.models, &body_text)
950            } else {
951                let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
952                let mut multi_ok = true;
953                for chunk in &chunks_info {
954                    let chunk_text = crate::chunking::chunk_text(&body_text, chunk);
955                    match crate::daemon::embed_passage_or_local(&paths.models, chunk_text) {
956                        Ok(emb) => chunk_embeddings.push(emb),
957                        Err(e) => {
958                            tracing::warn!(
959                                target: "ingest",
960                                file = %file_path,
961                                error = %e,
962                                "chunk embedding failed, skipping vector index for this file"
963                            );
964                            multi_ok = false;
965                            break;
966                        }
967                    }
968                }
969                if multi_ok {
970                    let aggregated = crate::chunking::aggregate_embeddings(&chunk_embeddings);
971                    // persist per-chunk vectors
972                    if let Err(e) = crate::storage::chunks::insert_chunk_slices(
973                        &conn,
974                        memory_id,
975                        &body_text,
976                        &chunks_info,
977                    ) {
978                        tracing::warn!(
979                            target: "ingest",
980                            file = %file_path,
981                            error = %e,
982                            "chunk slice insert failed"
983                        );
984                    } else {
985                        for (i, emb) in chunk_embeddings.iter().enumerate() {
986                            if let Err(e) = crate::storage::chunks::upsert_chunk_vec(
987                                &conn, i as i64, memory_id, i as i32, emb,
988                            ) {
989                                tracing::warn!(
990                                    target: "ingest",
991                                    file = %file_path,
992                                    chunk = i,
993                                    error = %e,
994                                    "chunk vec upsert failed"
995                                );
996                            }
997                        }
998                    }
999                    Ok(aggregated)
1000                } else {
1001                    // fallback: embed whole body for the memory-level vector
1002                    crate::daemon::embed_passage_or_local(&paths.models, &body_text)
1003                }
1004            };
1005
1006            match embedding_result {
1007                Ok(embedding) => {
1008                    if let Err(e) = memories::upsert_vec(
1009                        &conn,
1010                        memory_id,
1011                        &namespace,
1012                        &memory_type_str,
1013                        &embedding,
1014                        name,
1015                        &snippet,
1016                    ) {
1017                        tracing::warn!(
1018                            target: "ingest",
1019                            file = %file_path,
1020                            error = %e,
1021                            "memory vec upsert failed; recall may not find this memory"
1022                        );
1023                    }
1024                    // embed each entity that was successfully upserted
1025                    for ent in &new_entities {
1026                        if let Ok(Some(eid)) =
1027                            entities::find_entity_id(&conn, &namespace, &ent.name)
1028                        {
1029                            let entity_text = ent.name.clone();
1030                            match crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
1031                            {
1032                                Ok(emb) => {
1033                                    if let Err(e) = entities::upsert_entity_vec(
1034                                        &conn,
1035                                        eid,
1036                                        &namespace,
1037                                        ent.entity_type,
1038                                        &emb,
1039                                        &ent.name,
1040                                    ) {
1041                                        tracing::warn!(
1042                                            target: "ingest",
1043                                            entity = %ent.name,
1044                                            error = %e,
1045                                            "entity vec upsert failed"
1046                                        );
1047                                    }
1048                                }
1049                                Err(e) => {
1050                                    tracing::warn!(
1051                                        target: "ingest",
1052                                        entity = %ent.name,
1053                                        error = %e,
1054                                        "entity embedding failed"
1055                                    );
1056                                }
1057                            }
1058                        }
1059                    }
1060                }
1061                Err(e) => {
1062                    tracing::warn!(
1063                        target: "ingest",
1064                        file = %file_path,
1065                        error = %e,
1066                        "memory embedding failed; recall will not find this memory"
1067                    );
1068                }
1069            }
1070
1071            let _ = queue_conn.execute(
1072                "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1073                rusqlite::params![
1074                    name,
1075                    memory_id,
1076                    ent_count,
1077                    rel_count,
1078                    cost,
1079                    file_started.elapsed().as_millis() as i64,
1080                    queue_id
1081                ],
1082            );
1083
1084            let current_index = completed + failed + skipped;
1085            completed += 1;
1086            entities_total += ent_count;
1087            rels_total += rel_count;
1088            cost_total += cost;
1089
1090            emit_json(&FileEvent {
1091                file: &file_path,
1092                name,
1093                status: "done",
1094                memory_id: Some(memory_id),
1095                entities: Some(ent_count),
1096                rels: Some(rel_count),
1097                cost_usd: Some(cost),
1098                elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1099                error: None,
1100                index: current_index,
1101                total,
1102            });
1103        } else if let Some(ref err_str) = last_extract_err {
1104            if err_str.contains("RATE_LIMITED") {
1105                tracing::warn!(
1106                    target: "ingest",
1107                    wait_seconds = backoff_secs,
1108                    "rate limited, waiting before retry"
1109                );
1110                let _ = queue_conn.execute(
1111                    "UPDATE queue SET status='pending' WHERE id=?1",
1112                    rusqlite::params![queue_id],
1113                );
1114                std::thread::sleep(std::time::Duration::from_secs(backoff_secs));
1115                backoff_secs = (backoff_secs * 2).min(900);
1116                continue;
1117            } else {
1118                let _ = queue_conn.execute(
1119                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1120                    rusqlite::params![err_str, queue_id],
1121                );
1122                let current_index = completed + failed + skipped;
1123                failed += 1;
1124                emit_json(&FileEvent {
1125                    file: &file_path,
1126                    name: "",
1127                    status: "failed",
1128                    memory_id: None,
1129                    entities: None,
1130                    rels: None,
1131                    cost_usd: None,
1132                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1133                    error: Some(err_str),
1134                    index: current_index,
1135                    total,
1136                });
1137                if args.fail_fast {
1138                    break;
1139                }
1140            }
1141        }
1142
1143        if let Some(budget) = args.max_cost_usd {
1144            if cost_total >= budget {
1145                tracing::warn!(
1146                    target: "ingest",
1147                    spent = cost_total,
1148                    budget = budget,
1149                    "budget exceeded, stopping"
1150                );
1151                break;
1152            }
1153        }
1154    }
1155
1156    // Stage 4: Summary
1157    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1158    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1159
1160    emit_json(&Summary {
1161        summary: true,
1162        files_total: total,
1163        completed,
1164        failed,
1165        skipped,
1166        entities_total,
1167        rels_total,
1168        cost_usd: cost_total,
1169        elapsed_ms: started.elapsed().as_millis() as u64,
1170    });
1171
1172    if !args.keep_queue && failed == 0 {
1173        let _ = std::fs::remove_file(&args.queue_db);
1174    }
1175
1176    Ok(())
1177}
1178
1179#[cfg(test)]
1180mod tests {
1181    use super::*;
1182
1183    #[test]
1184    fn test_extraction_schema_valid_json() {
1185        let _: serde_json::Value =
1186            serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
1187    }
1188
1189    #[test]
1190    fn test_parse_claude_output_valid() {
1191        let output = r#"[
1192            {"type":"system","subtype":"init"},
1193            {"type":"assistant"},
1194            {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
1195        ]"#;
1196        let (result, cost) = parse_claude_output(output).expect("parse must succeed");
1197        assert_eq!(result.name, "test-doc");
1198        assert_eq!(result.entities.len(), 1);
1199        assert_eq!(result.relationships.len(), 1);
1200        assert!((cost - 0.02).abs() < f64::EPSILON);
1201    }
1202
1203    #[test]
1204    fn test_parse_claude_output_error() {
1205        let output = r#"[
1206            {"type":"system","subtype":"init"},
1207            {"type":"result","is_error":true,"error":"authentication failed"}
1208        ]"#;
1209        let err = parse_claude_output(output).unwrap_err();
1210        assert!(format!("{err}").contains("authentication failed"));
1211    }
1212
1213    #[test]
1214    fn test_parse_claude_output_rate_limit() {
1215        let output = r#"[
1216            {"type":"system","subtype":"init"},
1217            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
1218        ]"#;
1219        let err = parse_claude_output(output).unwrap_err();
1220        assert!(format!("{err}").contains("RATE_LIMITED"));
1221    }
1222
1223    #[test]
1224    fn test_parse_claude_output_malformed() {
1225        let output = "not json at all";
1226        assert!(parse_claude_output(output).is_err());
1227    }
1228
1229    #[test]
1230    fn test_find_claude_binary_not_found() {
1231        let original_path = std::env::var_os("PATH");
1232        std::env::set_var("PATH", "/nonexistent");
1233        std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
1234        let result = find_claude_binary(None);
1235        if let Some(p) = original_path {
1236            std::env::set_var("PATH", p);
1237        }
1238        assert!(result.is_err());
1239    }
1240
1241    #[test]
1242    fn test_parse_claude_output_result_fallback() {
1243        let output = r#"[
1244            {"type":"system","subtype":"init"},
1245            {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1246        ]"#;
1247        let (result, cost) = parse_claude_output(output).expect("result fallback must work");
1248        assert_eq!(result.name, "test-fallback");
1249        assert_eq!(result.entities.len(), 1);
1250        assert!(result.relationships.is_empty());
1251        assert!((cost - 0.01).abs() < f64::EPSILON);
1252    }
1253
1254    #[test]
1255    fn test_parse_claude_output_error_with_result_field() {
1256        let output = r#"[
1257            {"type":"system","subtype":"init"},
1258            {"type":"result","is_error":true,"result":"Not logged in ยท Please run /login"}
1259        ]"#;
1260        let err = parse_claude_output(output).unwrap_err();
1261        let msg = format!("{err}");
1262        assert!(
1263            msg.contains("Not logged in"),
1264            "expected 'Not logged in' in: {msg}"
1265        );
1266    }
1267
1268    #[test]
1269    fn test_extraction_schema_entity_types_match_enum() {
1270        let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1271        let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1272            .as_array()
1273            .expect("schema must have entity_type enum");
1274        for t in types {
1275            let s = t.as_str().unwrap();
1276            assert!(
1277                s.parse::<EntityType>().is_ok(),
1278                "schema entity_type '{s}' not in EntityType enum"
1279            );
1280        }
1281    }
1282}