Skip to main content

sqlite_graphrag/commands/
ingest_claude.rs

1//! Handler for `ingest --mode claude-code`.
2//!
3//! Orchestrates the locally installed Claude Code CLI binary (`claude -p`)
4//! to extract domain-specific entities and relationships from each file,
5//! then persists them via the same pipeline as `remember --graph-stdin`.
6//!
7//! Architecture: P1 One-Shot per file — each file spawns a separate
8//! `claude -p` process with `--json-schema` for guaranteed structured output.
9//! A SQLite queue DB tracks progress for resume/retry support.
10
11use crate::commands::ingest::IngestArgs;
12use crate::entity_type::EntityType;
13use crate::errors::AppError;
14use crate::paths::AppPaths;
15use crate::storage::connection::{ensure_db_ready, open_rw};
16use crate::storage::entities::{self, NewEntity, NewRelationship};
17use crate::storage::memories::{self, NewMemory};
18
19use rusqlite::Connection;
20use serde::{Deserialize, Serialize};
21use std::io::Write;
22use std::path::{Path, PathBuf};
23use std::process::{Command, Stdio};
24use std::time::Instant;
25
26const MIN_CLAUDE_VERSION: &str = "2.1.0";
27
28const EXTRACTION_SCHEMA: &str = r#"{
29  "type": "object",
30  "properties": {
31    "name": { "type": "string" },
32    "description": { "type": "string" },
33    "entities": {
34      "type": "array",
35      "items": {
36        "type": "object",
37        "properties": {
38          "name": { "type": "string" },
39          "entity_type": {
40            "type": "string",
41            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
42          }
43        },
44        "required": ["name", "entity_type"],
45        "additionalProperties": false
46      }
47    },
48    "relationships": {
49      "type": "array",
50      "items": {
51        "type": "object",
52        "properties": {
53          "source": { "type": "string" },
54          "target": { "type": "string" },
55          "relation": {
56            "type": "string",
57            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
58          },
59          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
60        },
61        "required": ["source","target","relation","strength"],
62        "additionalProperties": false
63      }
64    }
65  },
66  "required": ["name","description","entities","relationships"],
67  "additionalProperties": false
68}"#;
69
70const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
711. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
722. A one-sentence description (10-20 words) summarizing the key insight\n\
733. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
744. Typed relationships between entities with strength scores\n\n\
75Rules:\n\
76- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
77- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
78- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
79- NEVER use 'mentions' as relationship type\n\
80- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
81- Prefer fewer high-quality entities over many low-quality ones\n\
82- Description must answer: What is this about and WHY does it matter?";
83
84#[derive(Debug, Deserialize)]
85struct ClaudeOutputElement {
86    r#type: Option<String>,
87    subtype: Option<String>,
88    #[serde(default)]
89    is_error: bool,
90    structured_output: Option<ExtractionResult>,
91    result: Option<String>,
92    total_cost_usd: Option<f64>,
93    error: Option<String>,
94    terminal_reason: Option<String>,
95    #[serde(rename = "apiKeySource")]
96    api_key_source: Option<String>,
97}
98
99#[derive(Debug, Clone, Deserialize, Serialize)]
100pub struct ExtractionResult {
101    pub name: String,
102    pub description: String,
103    pub entities: Vec<ExtractedEntity>,
104    pub relationships: Vec<ExtractedRelationship>,
105}
106
107#[derive(Debug, Clone, Deserialize, Serialize)]
108pub struct ExtractedEntity {
109    pub name: String,
110    pub entity_type: String,
111}
112
113#[derive(Debug, Clone, Deserialize, Serialize)]
114pub struct ExtractedRelationship {
115    pub source: String,
116    pub target: String,
117    pub relation: String,
118    pub strength: f64,
119}
120
121#[derive(Debug, Serialize)]
122struct PhaseEvent<'a> {
123    phase: &'a str,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    claude_path: Option<&'a str>,
126    #[serde(skip_serializing_if = "Option::is_none")]
127    version: Option<&'a str>,
128    #[serde(skip_serializing_if = "Option::is_none")]
129    dir: Option<&'a str>,
130    #[serde(skip_serializing_if = "Option::is_none")]
131    files_total: Option<usize>,
132    #[serde(skip_serializing_if = "Option::is_none")]
133    files_new: Option<usize>,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    files_existing: Option<usize>,
136}
137
138#[derive(Debug, Serialize)]
139struct FileEvent<'a> {
140    file: &'a str,
141    name: &'a str,
142    status: &'a str,
143    #[serde(skip_serializing_if = "Option::is_none")]
144    memory_id: Option<i64>,
145    #[serde(skip_serializing_if = "Option::is_none")]
146    entities: Option<usize>,
147    #[serde(skip_serializing_if = "Option::is_none")]
148    rels: Option<usize>,
149    #[serde(skip_serializing_if = "Option::is_none")]
150    cost_usd: Option<f64>,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    elapsed_ms: Option<u64>,
153    #[serde(skip_serializing_if = "Option::is_none")]
154    error: Option<&'a str>,
155    index: usize,
156    total: usize,
157}
158
159#[derive(Debug, Serialize)]
160struct Summary {
161    summary: bool,
162    files_total: usize,
163    completed: usize,
164    failed: usize,
165    skipped: usize,
166    entities_total: usize,
167    rels_total: usize,
168    cost_usd: f64,
169    elapsed_ms: u64,
170}
171
172/// Locates the Claude Code binary on the system.
173pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
174    if let Some(p) = explicit {
175        if p.exists() {
176            return Ok(p.to_path_buf());
177        }
178        return Err(AppError::Validation(format!(
179            "Claude Code binary not found at explicit path: {}",
180            p.display()
181        )));
182    }
183
184    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
185        let p = PathBuf::from(&env_path);
186        if p.exists() {
187            return Ok(p);
188        }
189    }
190
191    let name = if cfg!(windows) {
192        "claude.exe"
193    } else {
194        "claude"
195    };
196    if let Some(path_var) = std::env::var_os("PATH") {
197        for dir in std::env::split_paths(&path_var) {
198            let candidate = dir.join(name);
199            if candidate.exists() {
200                return Ok(candidate);
201            }
202        }
203    }
204
205    Err(AppError::Validation(
206        "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
207    ))
208}
209
210/// Validates that the Claude Code binary meets the minimum version.
211fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
212    let output = Command::new(binary)
213        .arg("--version")
214        .stdin(Stdio::null())
215        .stdout(Stdio::piped())
216        .stderr(Stdio::piped())
217        .output()
218        .map_err(AppError::Io)?;
219
220    if !output.status.success() {
221        return Err(AppError::Validation(
222            "failed to run 'claude --version'".to_string(),
223        ));
224    }
225
226    let version_str = String::from_utf8(output.stdout)
227        .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
228    let version = version_str.trim().to_string();
229
230    // Extract the numeric version part before first space or paren, e.g. "2.1.149 (Claude Code)" -> "2.1.149"
231    let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
232
233    fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
234        let parts: Vec<&str> = s.splitn(3, '.').collect();
235        if parts.len() < 2 {
236            return None;
237        }
238        let major = parts[0].parse::<u64>().ok()?;
239        let minor = parts[1].parse::<u64>().ok()?;
240        let patch = parts
241            .get(2)
242            .and_then(|p| p.parse::<u64>().ok())
243            .unwrap_or(0);
244        Some((major, minor, patch))
245    }
246
247    if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
248        if actual < min {
249            return Err(AppError::Validation(format!(
250                "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
251            )));
252        }
253    }
254
255    Ok(version)
256}
257
258/// Invokes `claude -p` for a single file and returns the extraction result.
259///
260/// Uses `wait-timeout` for cross-platform subprocess timeout, `env_clear()`
261/// for least-privilege environment, and `--bare` when `ANTHROPIC_API_KEY`
262/// is available (faster startup) vs `--dangerously-skip-permissions` for
263/// OAuth users.
264fn extract_with_claude(
265    binary: &Path,
266    file_content: &[u8],
267    model: Option<&str>,
268    timeout_secs: u64,
269) -> Result<(ExtractionResult, f64, bool), AppError> {
270    use wait_timeout::ChildExt;
271
272    let mut cmd = Command::new(binary);
273
274    cmd.env_clear();
275    for var in &[
276        "PATH",
277        "HOME",
278        "USER",
279        "SHELL",
280        "TERM",
281        "LANG",
282        "XDG_CONFIG_HOME",
283        "XDG_DATA_HOME",
284        "XDG_RUNTIME_DIR",
285        "ANTHROPIC_API_KEY",
286        "CLAUDE_CONFIG_DIR",
287        "TMPDIR",
288        "TMP",
289        "TEMP",
290        "DYLD_FALLBACK_LIBRARY_PATH",
291    ] {
292        if let Ok(val) = std::env::var(var) {
293            cmd.env(var, val);
294        }
295    }
296
297    #[cfg(windows)]
298    for var in &[
299        "LOCALAPPDATA",
300        "APPDATA",
301        "USERPROFILE",
302        "SystemRoot",
303        "COMSPEC",
304        "PATHEXT",
305        "HOMEPATH",
306        "HOMEDRIVE",
307    ] {
308        if let Ok(val) = std::env::var(var) {
309            cmd.env(var, val);
310        }
311    }
312
313    cmd.arg("-p")
314        .arg(EXTRACTION_PROMPT)
315        .arg("--output-format")
316        .arg("json")
317        .arg("--json-schema")
318        .arg(EXTRACTION_SCHEMA)
319        .arg("--max-turns")
320        .arg("3")
321        .arg("--no-session-persistence");
322
323    if std::env::var("ANTHROPIC_API_KEY").is_ok() {
324        cmd.arg("--bare");
325    } else {
326        cmd.arg("--dangerously-skip-permissions")
327            .arg("--settings")
328            .arg(r#"{"hooks":{}}"#);
329    }
330
331    if let Some(m) = model {
332        cmd.arg("--model").arg(m);
333    }
334
335    cmd.stdin(Stdio::piped())
336        .stdout(Stdio::piped())
337        .stderr(Stdio::piped());
338
339    let mut child = cmd.spawn().map_err(|e| {
340        AppError::Io(std::io::Error::new(
341            e.kind(),
342            format!("failed to spawn claude: {e}"),
343        ))
344    })?;
345
346    let stdin_data = file_content.to_vec();
347    let mut child_stdin = child
348        .stdin
349        .take()
350        .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
351    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
352        child_stdin.write_all(&stdin_data)?;
353        Ok(())
354    });
355
356    let timeout = std::time::Duration::from_secs(timeout_secs);
357    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
358
359    match status {
360        Some(exit_status) => {
361            stdin_thread
362                .join()
363                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
364                .map_err(AppError::Io)?;
365
366            let mut stdout_buf = Vec::new();
367            let mut stderr_buf = Vec::new();
368            if let Some(mut out) = child.stdout.take() {
369                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
370            }
371            if let Some(mut err) = child.stderr.take() {
372                std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
373            }
374
375            if !exit_status.success() {
376                let stdout_str = String::from_utf8_lossy(&stdout_buf);
377                if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
378                {
379                    if let Some(re) = elements
380                        .iter()
381                        .find(|e| e.r#type.as_deref() == Some("result"))
382                    {
383                        if re.terminal_reason.as_deref() == Some("max_turns") {
384                            tracing::warn!(
385                                target: "ingest",
386                                "extraction hit max_turns limit — hooks may have consumed turns"
387                            );
388                            return Err(AppError::Validation(
389                                "claude -p hit max_turns: hooks may be consuming turns".into(),
390                            ));
391                        }
392                        if re.is_error {
393                            let err_msg = re
394                                .error
395                                .as_deref()
396                                .or(re.result.as_deref())
397                                .unwrap_or("unknown error");
398                            if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
399                                return Err(AppError::Validation(format!(
400                                    "RATE_LIMITED: {err_msg}"
401                                )));
402                            }
403                            if err_msg.contains("Not logged in")
404                                || err_msg.contains("authentication")
405                            {
406                                tracing::warn!(
407                                    target: "ingest",
408                                    "Claude Code authentication failed. Re-authenticate interactively with: claude"
409                                );
410                            }
411                            return Err(AppError::Validation(format!(
412                                "claude -p failed: {err_msg}"
413                            )));
414                        }
415                    }
416                }
417                let stderr_str = String::from_utf8_lossy(&stderr_buf);
418                if stderr_str.contains("auth") || stderr_str.contains("login") {
419                    tracing::warn!(
420                        target: "ingest",
421                        "Claude Code authentication may have failed. Re-authenticate with: claude"
422                    );
423                }
424                return Err(AppError::Validation(format!(
425                    "claude -p exited with code {:?}: {}",
426                    exit_status.code(),
427                    stderr_str.trim()
428                )));
429            }
430
431            let stdout = String::from_utf8(stdout_buf)
432                .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
433            parse_claude_output(&stdout)
434        }
435        None => {
436            tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
437            let _ = child.kill();
438            let _ = child.wait();
439            let _ = stdin_thread.join();
440            Err(AppError::Validation(format!(
441                "claude -p timed out after {timeout_secs} seconds"
442            )))
443        }
444    }
445}
446
447/// Parses the JSON array output from `claude -p --output-format json`.
448///
449/// Returns `(extraction, cost_usd, is_oauth)` where `is_oauth` is true when
450/// the init element reports `apiKeySource: "none"` (OAuth subscription).
451fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64, bool), AppError> {
452    let elements: Vec<ClaudeOutputElement> = serde_json::from_str(stdout).map_err(|e| {
453        AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
454    })?;
455
456    let is_oauth = elements
457        .iter()
458        .find(|e| e.r#type.as_deref() == Some("system") && e.subtype.as_deref() == Some("init"))
459        .and_then(|e| e.api_key_source.as_deref())
460        .map(|s| s == "none")
461        .unwrap_or(false);
462
463    let result_elem = elements
464        .iter()
465        .find(|e| e.r#type.as_deref() == Some("result"))
466        .ok_or_else(|| {
467            AppError::Validation("claude output missing 'result' element".to_string())
468        })?;
469
470    if result_elem.is_error {
471        let err_msg = result_elem
472            .error
473            .as_deref()
474            .or(result_elem.result.as_deref())
475            .unwrap_or("unknown error");
476        if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
477            return Err(AppError::Validation(format!("RATE_LIMITED: {err_msg}")));
478        }
479        return Err(AppError::Validation(format!(
480            "claude extraction failed: {err_msg}"
481        )));
482    }
483
484    let extraction = result_elem
485        .structured_output
486        .clone()
487        .or_else(|| {
488            result_elem
489                .result
490                .as_ref()
491                .and_then(|text| serde_json::from_str::<ExtractionResult>(text).ok())
492        })
493        .ok_or_else(|| {
494            AppError::Validation("claude result missing structured_output and result field".into())
495        })?;
496
497    let cost = result_elem.total_cost_usd.unwrap_or(0.0);
498
499    Ok((extraction, cost, is_oauth))
500}
501
502fn emit_json<T: Serialize>(value: &T) {
503    if let Ok(json) = serde_json::to_string(value) {
504        let stdout = std::io::stdout();
505        let mut lock = stdout.lock();
506        let _ = writeln!(lock, "{json}");
507        let _ = lock.flush();
508    }
509}
510
511/// Collects files matching the pattern (reuses ingest logic).
512fn collect_matching_files(
513    dir: &Path,
514    pattern: &str,
515    recursive: bool,
516    max_files: usize,
517) -> Result<Vec<PathBuf>, AppError> {
518    let mut files = Vec::new();
519    super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
520    files.sort();
521
522    if files.len() > max_files {
523        return Err(AppError::Validation(format!(
524            "found {} files, exceeds --max-files cap of {}",
525            files.len(),
526            max_files
527        )));
528    }
529
530    Ok(files)
531}
532
533/// Opens or creates the queue database for tracking ingest progress.
534fn open_queue_db(path: &str) -> Result<Connection, AppError> {
535    let conn = Connection::open(path)?;
536
537    conn.pragma_update(None, "journal_mode", "wal")?;
538
539    conn.execute_batch(
540        "CREATE TABLE IF NOT EXISTS queue (
541            id          INTEGER PRIMARY KEY AUTOINCREMENT,
542            file_path   TEXT NOT NULL UNIQUE,
543            name        TEXT,
544            status      TEXT NOT NULL DEFAULT 'pending',
545            memory_id   INTEGER,
546            entities    INTEGER DEFAULT 0,
547            rels        INTEGER DEFAULT 0,
548            error       TEXT,
549            cost_usd    REAL DEFAULT 0.0,
550            attempt     INTEGER DEFAULT 0,
551            elapsed_ms  INTEGER,
552            created_at  TEXT DEFAULT (datetime('now')),
553            done_at     TEXT
554        );
555        CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
556    )?;
557
558    Ok(conn)
559}
560
561/// Main entry point for `ingest --mode claude-code`.
562pub fn run_claude_ingest(args: &IngestArgs) -> Result<(), AppError> {
563    let started = Instant::now();
564
565    if !args.dir.exists() {
566        return Err(AppError::Validation(format!(
567            "directory not found: {}",
568            args.dir.display()
569        )));
570    }
571
572    // Stage 1: Validate
573    let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
574    let version = validate_claude_version(&claude_binary)?;
575    tracing::info!(
576        target: "ingest",
577        binary = %claude_binary.display(),
578        version = %version,
579        "Claude Code binary validated"
580    );
581
582    emit_json(&PhaseEvent {
583        phase: "validate",
584        claude_path: claude_binary.to_str(),
585        version: Some(&version),
586        dir: None,
587        files_total: None,
588        files_new: None,
589        files_existing: None,
590    });
591
592    // Stage 2: Scan
593    let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
594
595    let queue_conn = open_queue_db(&args.queue_db)?;
596
597    if args.resume {
598        let reset = queue_conn
599            .execute(
600                "UPDATE queue SET status='pending' WHERE status='processing'",
601                [],
602            )
603            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
604        if reset > 0 {
605            tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
606        }
607    }
608
609    if args.retry_failed {
610        let count = queue_conn
611            .execute(
612                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
613                [],
614            )
615            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
616        tracing::info!(target: "ingest", count, "retrying failed files");
617    }
618
619    if !args.resume && !args.retry_failed {
620        queue_conn
621            .execute("DELETE FROM queue", [])
622            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
623    }
624
625    let mut new_count = 0usize;
626    let mut existing_count = 0usize;
627
628    if !args.retry_failed {
629        for file in &files {
630            let file_str = file.to_string_lossy().to_string();
631            let inserted = queue_conn
632                .execute(
633                    "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
634                    rusqlite::params![file_str],
635                )
636                .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
637            if inserted > 0 {
638                new_count += 1;
639            } else {
640                existing_count += 1;
641            }
642        }
643    }
644
645    emit_json(&PhaseEvent {
646        phase: "scan",
647        claude_path: None,
648        version: None,
649        dir: args.dir.to_str(),
650        files_total: Some(files.len()),
651        files_new: Some(new_count),
652        files_existing: Some(existing_count),
653    });
654
655    if args.dry_run {
656        for (idx, file) in files.iter().enumerate() {
657            let (name, _truncated, _orig) =
658                super::ingest::derive_kebab_name(file, args.max_name_length);
659            emit_json(&FileEvent {
660                file: &file.to_string_lossy(),
661                name: &name,
662                status: "preview",
663                memory_id: None,
664                entities: None,
665                rels: None,
666                cost_usd: None,
667                elapsed_ms: None,
668                error: None,
669                index: idx,
670                total: files.len(),
671            });
672        }
673        emit_json(&Summary {
674            summary: true,
675            files_total: files.len(),
676            completed: 0,
677            failed: 0,
678            skipped: 0,
679            entities_total: 0,
680            rels_total: 0,
681            cost_usd: 0.0,
682            elapsed_ms: started.elapsed().as_millis() as u64,
683        });
684        if !args.keep_queue {
685            let _ = std::fs::remove_file(&args.queue_db);
686        }
687        return Ok(());
688    }
689
690    // Stage 3: Process
691    let paths = AppPaths::resolve(args.db.as_deref())?;
692    ensure_db_ready(&paths)?;
693    let conn = open_rw(&paths.db)?;
694    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
695    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
696    let memory_type_str = args.r#type.as_str().to_string();
697
698    let mut completed = 0usize;
699    let mut failed = 0usize;
700    let skipped_initial: usize = queue_conn
701        .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
702            r.get::<_, usize>(0)
703        })
704        .unwrap_or(0);
705    let mut skipped = skipped_initial;
706    let mut entities_total = 0usize;
707    let mut rels_total = 0usize;
708    let mut cost_total = 0.0f64;
709    let mut oauth_detected = false;
710    let total = files.len();
711
712    let mut backoff_secs = args.rate_limit_wait;
713
714    loop {
715        let pending: Option<(i64, String)> = queue_conn
716            .query_row(
717                "UPDATE queue SET status='processing', attempt=attempt+1 \
718                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
719                 RETURNING id, file_path",
720                [],
721                |row| Ok((row.get(0)?, row.get(1)?)),
722            )
723            .ok();
724
725        let (queue_id, file_path) = match pending {
726            Some(p) => p,
727            None => break,
728        };
729
730        let file_started = Instant::now();
731
732        // G05: reject files that exceed the 10 MB stdin limit
733        const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
734        if let Ok(meta) = std::fs::metadata(&file_path) {
735            if meta.len() > MAX_FILE_SIZE {
736                let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
737                let _ = queue_conn.execute(
738                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
739                    rusqlite::params![err_msg, queue_id],
740                );
741                let current_index = completed + failed + skipped;
742                failed += 1;
743                emit_json(&FileEvent {
744                    file: &file_path,
745                    name: "",
746                    status: "failed",
747                    memory_id: None,
748                    entities: None,
749                    rels: None,
750                    cost_usd: None,
751                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
752                    error: Some(&err_msg),
753                    index: current_index,
754                    total,
755                });
756                if args.fail_fast {
757                    break;
758                }
759                continue;
760            }
761        }
762
763        let file_content = match std::fs::read(&file_path) {
764            Ok(c) => c,
765            Err(e) => {
766                let err_msg = format!("IO error: {e}");
767                let _ = queue_conn.execute(
768                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
769                    rusqlite::params![err_msg, queue_id],
770                );
771                let current_index = completed + failed + skipped;
772                failed += 1;
773                emit_json(&FileEvent {
774                    file: &file_path,
775                    name: "",
776                    status: "failed",
777                    memory_id: None,
778                    entities: None,
779                    rels: None,
780                    cost_usd: None,
781                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
782                    error: Some(&err_msg),
783                    index: current_index,
784                    total,
785                });
786                if args.fail_fast {
787                    break;
788                }
789                continue;
790            }
791        };
792
793        // B08: skip files exceeding body cap BEFORE sending to LLM to avoid wasting tokens
794        if file_content.len() > crate::constants::MAX_MEMORY_BODY_LEN {
795            let err_msg = format!(
796                "file body exceeds {} byte limit ({} bytes) — skipping to avoid wasting LLM tokens",
797                crate::constants::MAX_MEMORY_BODY_LEN,
798                file_content.len()
799            );
800            tracing::warn!(target: "ingest", file = %file_path, size = file_content.len(), "body exceeds limit, skipping LLM extraction");
801            let _ = queue_conn.execute(
802                "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
803                rusqlite::params![err_msg, queue_id],
804            );
805            let current_index = completed + failed + skipped;
806            skipped += 1;
807            emit_json(&FileEvent {
808                file: &file_path,
809                name: "",
810                status: "skipped",
811                memory_id: None,
812                entities: None,
813                rels: None,
814                cost_usd: None,
815                elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
816                error: Some(&err_msg),
817                index: current_index,
818                total,
819            });
820            continue;
821        }
822
823        // B07: retry once on cold-start failure (Claude Code Issue #23265)
824        let max_extract_attempts: u32 = 2;
825        let mut extraction_result: Option<(ExtractionResult, f64, bool)> = None;
826        let mut last_extract_err: Option<String> = None;
827
828        for attempt in 1..=max_extract_attempts {
829            match extract_with_claude(
830                &claude_binary,
831                &file_content,
832                args.claude_model.as_deref(),
833                args.claude_timeout,
834            ) {
835                Ok(result) => {
836                    extraction_result = Some(result);
837                    break;
838                }
839                Err(ref e) if format!("{e}").contains("RATE_LIMITED") => {
840                    last_extract_err = Some(format!("{e}"));
841                    break;
842                }
843                Err(e) => {
844                    let msg = format!("{e}");
845                    if attempt < max_extract_attempts {
846                        tracing::warn!(target: "ingest", attempt, error = %msg, "extraction failed, retrying (cold-start workaround)");
847                        std::thread::sleep(std::time::Duration::from_secs(2));
848                    }
849                    last_extract_err = Some(msg);
850                }
851            }
852        }
853
854        if let Some((extraction, cost, is_oauth)) = extraction_result {
855            if is_oauth && !oauth_detected {
856                oauth_detected = true;
857                tracing::info!(target: "ingest", "OAuth subscription detected — cost_usd omitted from output");
858            }
859            backoff_secs = args.rate_limit_wait;
860
861            let (normalized_name, _truncated, _orig) = crate::commands::ingest::derive_kebab_name(
862                std::path::Path::new(&extraction.name),
863                args.max_name_length,
864            );
865            let name = &normalized_name;
866            let ent_count = extraction.entities.len();
867            let rel_count = extraction.relationships.len();
868
869            let new_entities: Vec<NewEntity> = extraction
870                .entities
871                .iter()
872                .filter_map(|e| match e.entity_type.parse::<EntityType>() {
873                    Ok(et) => Some(NewEntity {
874                        name: e.name.clone(),
875                        entity_type: et,
876                        description: None,
877                    }),
878                    Err(_) => {
879                        tracing::warn!(
880                            target: "ingest",
881                            entity = %e.name,
882                            entity_type = %e.entity_type,
883                            "entity type not recognized, skipping"
884                        );
885                        None
886                    }
887                })
888                .collect();
889
890            let new_relationships: Vec<NewRelationship> = extraction
891                .relationships
892                .iter()
893                .map(|r| NewRelationship {
894                    source: r.source.clone(),
895                    target: r.target.clone(),
896                    relation: crate::parsers::normalize_relation(&r.relation),
897                    strength: r.strength,
898                    description: None,
899                })
900                .collect();
901
902            let body_str = String::from_utf8_lossy(&file_content);
903            let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
904            let new_memory = NewMemory {
905                name: name.clone(),
906                namespace: namespace.clone(),
907                memory_type: memory_type_str.clone(),
908                description: extraction.description.clone(),
909                body: body_str.to_string(),
910                body_hash,
911                session_id: None,
912                source: "agent".to_string(),
913                metadata: serde_json::Value::Object(serde_json::Map::new()),
914            };
915
916            // B06: deduplication — update existing memory instead of failing on UNIQUE
917            let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
918                Some((existing_id, is_deleted)) => {
919                    if is_deleted {
920                        memories::clear_deleted_at(&conn, existing_id)?;
921                    }
922                    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
923                        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
924                        rusqlite::params![existing_id],
925                        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
926                    )?;
927                    memories::update(&conn, existing_id, &new_memory, None)?;
928                    memories::sync_fts_after_update(
929                        &conn,
930                        existing_id,
931                        &old_name,
932                        &old_desc,
933                        &old_body,
934                        &new_memory.name,
935                        &new_memory.description,
936                        &new_memory.body,
937                    )?;
938                    tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
939                    existing_id
940                }
941                None => match memories::insert(&conn, &new_memory) {
942                    Ok(id) => id,
943                    Err(e) => {
944                        let err_msg = format!("{e}");
945                        let _ = queue_conn.execute(
946                                "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
947                                rusqlite::params![err_msg, queue_id],
948                            );
949                        let current_index = completed + failed + skipped;
950                        failed += 1;
951                        emit_json(&FileEvent {
952                            file: &file_path,
953                            name,
954                            status: "failed",
955                            memory_id: None,
956                            entities: None,
957                            rels: None,
958                            cost_usd: if is_oauth { None } else { Some(cost) },
959                            elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
960                            error: Some(&err_msg),
961                            index: current_index,
962                            total,
963                        });
964                        if !is_oauth {
965                            cost_total += cost;
966                        }
967                        if args.fail_fast {
968                            break;
969                        }
970                        continue;
971                    }
972                },
973            };
974
975            for ent in &new_entities {
976                match entities::upsert_entity(&conn, &namespace, ent) {
977                    Ok(eid) => {
978                        let _ = entities::link_memory_entity(&conn, memory_id, eid);
979                    }
980                    Err(e) => {
981                        tracing::warn!(
982                            target: "ingest",
983                            entity = %ent.name,
984                            error = %e,
985                            "entity skipped due to validation error"
986                        );
987                    }
988                }
989            }
990            for rel in &new_relationships {
991                crate::parsers::warn_if_non_canonical(&rel.relation);
992                let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
993                let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
994                if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
995                    let _ = conn.execute(
996                        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
997                        rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
998                    );
999                }
1000            }
1001
1002            // G01: embedding pipeline — enables recall to find memories created via --mode claude-code
1003            let body_text = String::from_utf8_lossy(&file_content).into_owned();
1004            let snippet: String = body_text.chars().take(200).collect();
1005            let chunks_info =
1006                crate::chunking::split_into_chunks_hierarchical(&body_text, tokenizer);
1007
1008            let embedding_result = if chunks_info.len() <= 1 {
1009                crate::daemon::embed_passage_or_local(&paths.models, &body_text)
1010            } else {
1011                let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
1012                let mut multi_ok = true;
1013                for chunk in &chunks_info {
1014                    let chunk_text = crate::chunking::chunk_text(&body_text, chunk);
1015                    match crate::daemon::embed_passage_or_local(&paths.models, chunk_text) {
1016                        Ok(emb) => chunk_embeddings.push(emb),
1017                        Err(e) => {
1018                            tracing::warn!(
1019                                target: "ingest",
1020                                file = %file_path,
1021                                error = %e,
1022                                "chunk embedding failed, skipping vector index for this file"
1023                            );
1024                            multi_ok = false;
1025                            break;
1026                        }
1027                    }
1028                }
1029                if multi_ok {
1030                    let aggregated = crate::chunking::aggregate_embeddings(&chunk_embeddings);
1031                    // persist per-chunk vectors
1032                    if let Err(e) = crate::storage::chunks::insert_chunk_slices(
1033                        &conn,
1034                        memory_id,
1035                        &body_text,
1036                        &chunks_info,
1037                    ) {
1038                        tracing::warn!(
1039                            target: "ingest",
1040                            file = %file_path,
1041                            error = %e,
1042                            "chunk slice insert failed"
1043                        );
1044                    } else {
1045                        for (i, emb) in chunk_embeddings.iter().enumerate() {
1046                            if let Err(e) = crate::storage::chunks::upsert_chunk_vec(
1047                                &conn, i as i64, memory_id, i as i32, emb,
1048                            ) {
1049                                tracing::warn!(
1050                                    target: "ingest",
1051                                    file = %file_path,
1052                                    chunk = i,
1053                                    error = %e,
1054                                    "chunk vec upsert failed"
1055                                );
1056                            }
1057                        }
1058                    }
1059                    Ok(aggregated)
1060                } else {
1061                    // fallback: embed whole body for the memory-level vector
1062                    crate::daemon::embed_passage_or_local(&paths.models, &body_text)
1063                }
1064            };
1065
1066            match embedding_result {
1067                Ok(embedding) => {
1068                    if let Err(e) = memories::upsert_vec(
1069                        &conn,
1070                        memory_id,
1071                        &namespace,
1072                        &memory_type_str,
1073                        &embedding,
1074                        name,
1075                        &snippet,
1076                    ) {
1077                        tracing::warn!(
1078                            target: "ingest",
1079                            file = %file_path,
1080                            error = %e,
1081                            "memory vec upsert failed; recall may not find this memory"
1082                        );
1083                    }
1084                    // embed each entity that was successfully upserted
1085                    for ent in &new_entities {
1086                        if let Ok(Some(eid)) =
1087                            entities::find_entity_id(&conn, &namespace, &ent.name)
1088                        {
1089                            let entity_text = ent.name.clone();
1090                            match crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
1091                            {
1092                                Ok(emb) => {
1093                                    if let Err(e) = entities::upsert_entity_vec(
1094                                        &conn,
1095                                        eid,
1096                                        &namespace,
1097                                        ent.entity_type,
1098                                        &emb,
1099                                        &ent.name,
1100                                    ) {
1101                                        tracing::warn!(
1102                                            target: "ingest",
1103                                            entity = %ent.name,
1104                                            error = %e,
1105                                            "entity vec upsert failed"
1106                                        );
1107                                    }
1108                                }
1109                                Err(e) => {
1110                                    tracing::warn!(
1111                                        target: "ingest",
1112                                        entity = %ent.name,
1113                                        error = %e,
1114                                        "entity embedding failed"
1115                                    );
1116                                }
1117                            }
1118                        }
1119                    }
1120                }
1121                Err(e) => {
1122                    tracing::warn!(
1123                        target: "ingest",
1124                        file = %file_path,
1125                        error = %e,
1126                        "memory embedding failed; recall will not find this memory"
1127                    );
1128                }
1129            }
1130
1131            let _ = queue_conn.execute(
1132                "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1133                rusqlite::params![
1134                    name,
1135                    memory_id,
1136                    ent_count,
1137                    rel_count,
1138                    cost,
1139                    file_started.elapsed().as_millis() as i64,
1140                    queue_id
1141                ],
1142            );
1143
1144            let current_index = completed + failed + skipped;
1145            completed += 1;
1146            entities_total += ent_count;
1147            rels_total += rel_count;
1148            if !is_oauth {
1149                cost_total += cost;
1150            }
1151
1152            emit_json(&FileEvent {
1153                file: &file_path,
1154                name,
1155                status: "done",
1156                memory_id: Some(memory_id),
1157                entities: Some(ent_count),
1158                rels: Some(rel_count),
1159                cost_usd: if is_oauth { None } else { Some(cost) },
1160                elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1161                error: None,
1162                index: current_index,
1163                total,
1164            });
1165        } else if let Some(ref err_str) = last_extract_err {
1166            if err_str.contains("RATE_LIMITED") {
1167                tracing::warn!(
1168                    target: "ingest",
1169                    wait_seconds = backoff_secs,
1170                    "rate limited, waiting before retry"
1171                );
1172                let _ = queue_conn.execute(
1173                    "UPDATE queue SET status='pending' WHERE id=?1",
1174                    rusqlite::params![queue_id],
1175                );
1176                std::thread::sleep(std::time::Duration::from_secs(backoff_secs));
1177                backoff_secs = (backoff_secs * 2).min(900);
1178                continue;
1179            } else {
1180                let _ = queue_conn.execute(
1181                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1182                    rusqlite::params![err_str, queue_id],
1183                );
1184                let current_index = completed + failed + skipped;
1185                failed += 1;
1186                emit_json(&FileEvent {
1187                    file: &file_path,
1188                    name: "",
1189                    status: "failed",
1190                    memory_id: None,
1191                    entities: None,
1192                    rels: None,
1193                    cost_usd: None,
1194                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1195                    error: Some(err_str),
1196                    index: current_index,
1197                    total,
1198                });
1199                if args.fail_fast {
1200                    break;
1201                }
1202            }
1203        }
1204
1205        if let Some(budget) = args.max_cost_usd {
1206            if oauth_detected {
1207                tracing::debug!(target: "ingest", "--max-cost-usd ignored: OAuth subscription detected");
1208            } else if cost_total >= budget {
1209                tracing::warn!(
1210                    target: "ingest",
1211                    spent = cost_total,
1212                    budget = budget,
1213                    "budget exceeded, stopping"
1214                );
1215                break;
1216            }
1217        }
1218    }
1219
1220    // Stage 4: Summary
1221    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1222    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1223
1224    emit_json(&Summary {
1225        summary: true,
1226        files_total: total,
1227        completed,
1228        failed,
1229        skipped,
1230        entities_total,
1231        rels_total,
1232        cost_usd: cost_total,
1233        elapsed_ms: started.elapsed().as_millis() as u64,
1234    });
1235
1236    if !args.keep_queue && failed == 0 {
1237        let _ = std::fs::remove_file(&args.queue_db);
1238    }
1239
1240    Ok(())
1241}
1242
1243#[cfg(test)]
1244mod tests {
1245    use super::*;
1246
1247    #[test]
1248    fn test_extraction_schema_valid_json() {
1249        let _: serde_json::Value =
1250            serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
1251    }
1252
1253    #[test]
1254    fn test_parse_claude_output_valid() {
1255        let output = r#"[
1256            {"type":"system","subtype":"init"},
1257            {"type":"assistant"},
1258            {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
1259        ]"#;
1260        let (result, cost, _is_oauth) = parse_claude_output(output).expect("parse must succeed");
1261        assert_eq!(result.name, "test-doc");
1262        assert_eq!(result.entities.len(), 1);
1263        assert_eq!(result.relationships.len(), 1);
1264        assert!((cost - 0.02).abs() < f64::EPSILON);
1265    }
1266
1267    #[test]
1268    fn test_parse_claude_output_error() {
1269        let output = r#"[
1270            {"type":"system","subtype":"init"},
1271            {"type":"result","is_error":true,"error":"authentication failed"}
1272        ]"#;
1273        let err = parse_claude_output(output).unwrap_err();
1274        assert!(format!("{err}").contains("authentication failed"));
1275    }
1276
1277    #[test]
1278    fn test_parse_claude_output_rate_limit() {
1279        let output = r#"[
1280            {"type":"system","subtype":"init"},
1281            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
1282        ]"#;
1283        let err = parse_claude_output(output).unwrap_err();
1284        assert!(format!("{err}").contains("RATE_LIMITED"));
1285    }
1286
1287    #[test]
1288    fn test_parse_claude_output_malformed() {
1289        let output = "not json at all";
1290        assert!(parse_claude_output(output).is_err());
1291    }
1292
1293    #[test]
1294    fn test_find_claude_binary_not_found() {
1295        let original_path = std::env::var_os("PATH");
1296        std::env::set_var("PATH", "/nonexistent");
1297        std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
1298        let result = find_claude_binary(None);
1299        if let Some(p) = original_path {
1300            std::env::set_var("PATH", p);
1301        }
1302        assert!(result.is_err());
1303    }
1304
1305    #[test]
1306    fn test_parse_claude_output_result_fallback() {
1307        let output = r#"[
1308            {"type":"system","subtype":"init"},
1309            {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1310        ]"#;
1311        let (result, cost, _is_oauth) =
1312            parse_claude_output(output).expect("result fallback must work");
1313        assert_eq!(result.name, "test-fallback");
1314        assert_eq!(result.entities.len(), 1);
1315        assert!(result.relationships.is_empty());
1316        assert!((cost - 0.01).abs() < f64::EPSILON);
1317    }
1318
1319    #[test]
1320    fn test_parse_claude_output_error_with_result_field() {
1321        let output = r#"[
1322            {"type":"system","subtype":"init"},
1323            {"type":"result","is_error":true,"result":"Not logged in · Please run /login"}
1324        ]"#;
1325        let err = parse_claude_output(output).unwrap_err();
1326        let msg = format!("{err}");
1327        assert!(
1328            msg.contains("Not logged in"),
1329            "expected 'Not logged in' in: {msg}"
1330        );
1331    }
1332
1333    #[test]
1334    fn test_terminal_reason_max_turns_detected() {
1335        let output = r#"[
1336            {"type":"system","subtype":"init"},
1337            {"type":"result","is_error":false,"terminal_reason":"max_turns","structured_output":{"name":"t","description":"d","entities":[],"relationships":[]}}
1338        ]"#;
1339        let err_or_ok = parse_claude_output(output);
1340        assert!(
1341            err_or_ok.is_ok(),
1342            "max_turns in result without is_error should still parse"
1343        );
1344    }
1345
1346    #[test]
1347    fn test_detect_oauth_from_init_json() {
1348        let output = r#"[
1349            {"type":"system","subtype":"init","apiKeySource":"none"},
1350            {"type":"result","is_error":false,"total_cost_usd":0.50,"structured_output":{"name":"test-oauth","description":"oauth test","entities":[],"relationships":[]}}
1351        ]"#;
1352        let (_result, cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1353        assert!(is_oauth, "apiKeySource=none must be detected as OAuth");
1354        assert!((cost - 0.50).abs() < f64::EPSILON);
1355    }
1356
1357    #[test]
1358    fn test_api_key_source_not_oauth() {
1359        let output = r#"[
1360            {"type":"system","subtype":"init","apiKeySource":"env"},
1361            {"type":"result","is_error":false,"total_cost_usd":0.10,"structured_output":{"name":"test-api","description":"api test","entities":[],"relationships":[]}}
1362        ]"#;
1363        let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1364        assert!(!is_oauth, "apiKeySource=env must NOT be detected as OAuth");
1365    }
1366
1367    #[test]
1368    fn test_missing_api_key_source_defaults_not_oauth() {
1369        let output = r#"[
1370            {"type":"system","subtype":"init"},
1371            {"type":"result","is_error":false,"total_cost_usd":0.05,"structured_output":{"name":"test-missing","description":"missing test","entities":[],"relationships":[]}}
1372        ]"#;
1373        let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1374        assert!(!is_oauth, "missing apiKeySource must default to not OAuth");
1375    }
1376
1377    #[test]
1378    fn test_extraction_schema_entity_types_match_enum() {
1379        let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1380        let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1381            .as_array()
1382            .expect("schema must have entity_type enum");
1383        for t in types {
1384            let s = t.as_str().unwrap();
1385            assert!(
1386                s.parse::<EntityType>().is_ok(),
1387                "schema entity_type '{s}' not in EntityType enum"
1388            );
1389        }
1390    }
1391}