Skip to main content

sqlite_graphrag/commands/
ingest_claude.rs

1//! Handler for `ingest --mode claude-code`.
2//!
3//! Orchestrates the locally installed Claude Code CLI binary (`claude -p`)
4//! to extract domain-specific entities and relationships from each file,
5//! then persists them via the same pipeline as `remember --graph-stdin`.
6//!
7//! Architecture: P1 One-Shot per file — each file spawns a separate
8//! `claude -p` process with `--json-schema` for guaranteed structured output.
9//! A SQLite queue DB tracks progress for resume/retry support.
10// Workload: Subprocess I/O-bound (claude -p headless with network wait)
11
12use crate::commands::ingest::IngestArgs;
13use crate::entity_type::EntityType;
14use crate::errors::AppError;
15use crate::paths::AppPaths;
16use crate::spawn::env_whitelist::apply_env_whitelist;
17use crate::storage::connection::{ensure_db_ready, open_rw};
18use crate::storage::entities::{self, NewEntity, NewRelationship};
19use crate::storage::memories::{self, NewMemory};
20
21use rusqlite::Connection;
22use serde::{Deserialize, Serialize};
23use std::io::Write;
24use std::path::{Path, PathBuf};
25use std::process::{Command, Stdio};
26use std::time::Instant;
27
28const MIN_CLAUDE_VERSION: &str = "2.1.0";
29
30const EXTRACTION_SCHEMA: &str = r#"{
31  "type": "object",
32  "properties": {
33    "name": { "type": "string" },
34    "description": { "type": "string" },
35    "entities": {
36      "type": "array",
37      "items": {
38        "type": "object",
39        "properties": {
40          "name": { "type": "string" },
41          "entity_type": {
42            "type": "string",
43            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
44          }
45        },
46        "required": ["name", "entity_type"],
47        "additionalProperties": false
48      }
49    },
50    "relationships": {
51      "type": "array",
52      "items": {
53        "type": "object",
54        "properties": {
55          "source": { "type": "string" },
56          "target": { "type": "string" },
57          "relation": {
58            "type": "string",
59            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
60          },
61          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
62        },
63        "required": ["source","target","relation","strength"],
64        "additionalProperties": false
65      }
66    }
67  },
68  "required": ["name","description","entities","relationships"],
69  "additionalProperties": false
70}"#;
71
72const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
731. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
742. A one-sentence description (10-20 words) summarizing the key insight\n\
753. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
764. Typed relationships between entities with strength scores\n\n\
77Rules:\n\
78- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
79- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
80- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
81- NEVER use 'mentions' as relationship type\n\
82- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
83- Prefer fewer high-quality entities over many low-quality ones\n\
84- Description must answer: What is this about and WHY does it matter?";
85
86#[derive(Debug, Deserialize)]
87struct ClaudeOutputElement {
88    r#type: Option<String>,
89    subtype: Option<String>,
90    #[serde(default)]
91    is_error: bool,
92    structured_output: Option<ExtractionResult>,
93    result: Option<String>,
94    total_cost_usd: Option<f64>,
95    error: Option<String>,
96    terminal_reason: Option<String>,
97    #[serde(rename = "apiKeySource")]
98    api_key_source: Option<String>,
99}
100
101#[derive(Debug, Clone, Deserialize, Serialize)]
102pub struct ExtractionResult {
103    pub name: String,
104    pub description: String,
105    pub entities: Vec<ExtractedEntity>,
106    pub relationships: Vec<ExtractedRelationship>,
107}
108
109#[derive(Debug, Clone, Deserialize, Serialize)]
110pub struct ExtractedEntity {
111    pub name: String,
112    pub entity_type: String,
113}
114
115#[derive(Debug, Clone, Deserialize, Serialize)]
116pub struct ExtractedRelationship {
117    pub source: String,
118    pub target: String,
119    pub relation: String,
120    pub strength: f64,
121}
122
123#[derive(Debug, Serialize)]
124struct PhaseEvent<'a> {
125    phase: &'a str,
126    #[serde(skip_serializing_if = "Option::is_none")]
127    claude_path: Option<&'a str>,
128    #[serde(skip_serializing_if = "Option::is_none")]
129    version: Option<&'a str>,
130    #[serde(skip_serializing_if = "Option::is_none")]
131    dir: Option<&'a str>,
132    #[serde(skip_serializing_if = "Option::is_none")]
133    files_total: Option<usize>,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    files_new: Option<usize>,
136    #[serde(skip_serializing_if = "Option::is_none")]
137    files_existing: Option<usize>,
138}
139
140#[derive(Debug, Serialize)]
141struct FileEvent<'a> {
142    file: &'a str,
143    name: &'a str,
144    status: &'a str,
145    #[serde(skip_serializing_if = "Option::is_none")]
146    memory_id: Option<i64>,
147    #[serde(skip_serializing_if = "Option::is_none")]
148    entities: Option<usize>,
149    #[serde(skip_serializing_if = "Option::is_none")]
150    rels: Option<usize>,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    cost_usd: Option<f64>,
153    #[serde(skip_serializing_if = "Option::is_none")]
154    elapsed_ms: Option<u64>,
155    #[serde(skip_serializing_if = "Option::is_none")]
156    error: Option<&'a str>,
157    index: usize,
158    total: usize,
159}
160
161#[derive(Debug, Serialize)]
162struct Summary {
163    summary: bool,
164    files_total: usize,
165    completed: usize,
166    failed: usize,
167    skipped: usize,
168    entities_total: usize,
169    rels_total: usize,
170    cost_usd: f64,
171    elapsed_ms: u64,
172}
173
174/// Locates the Claude Code binary on the system.
175pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
176    if let Some(p) = explicit {
177        if p.exists() {
178            return Ok(p.to_path_buf());
179        }
180        return Err(AppError::Validation(format!(
181            "Claude Code binary not found at explicit path: {}",
182            p.display()
183        )));
184    }
185
186    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
187        let p = PathBuf::from(&env_path);
188        if p.exists() {
189            return Ok(p);
190        }
191    }
192
193    let name = if cfg!(windows) {
194        "claude.exe"
195    } else {
196        "claude"
197    };
198    if let Some(path_var) = std::env::var_os("PATH") {
199        for dir in std::env::split_paths(&path_var) {
200            let candidate = dir.join(name);
201            if candidate.exists() {
202                return Ok(candidate);
203            }
204        }
205    }
206
207    Err(AppError::Validation(
208        "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
209    ))
210}
211
212/// Validates that the Claude Code binary meets the minimum version.
213fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
214    let output = Command::new(binary)
215        .arg("--version")
216        .stdin(Stdio::null())
217        .stdout(Stdio::piped())
218        .stderr(Stdio::piped())
219        .output()
220        .map_err(AppError::Io)?;
221
222    if !output.status.success() {
223        return Err(AppError::Validation(
224            "failed to run 'claude --version'".to_string(),
225        ));
226    }
227
228    let version_str = String::from_utf8(output.stdout)
229        .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
230    let version = version_str.trim().to_string();
231
232    // Extract the numeric version part before first space or paren, e.g. "2.1.149 (Claude Code)" -> "2.1.149"
233    let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
234
235    fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
236        let parts: Vec<&str> = s.splitn(3, '.').collect();
237        if parts.len() < 2 {
238            return None;
239        }
240        let major = parts[0].parse::<u64>().ok()?;
241        let minor = parts[1].parse::<u64>().ok()?;
242        let patch = parts
243            .get(2)
244            .and_then(|p| p.parse::<u64>().ok())
245            .unwrap_or(0);
246        Some((major, minor, patch))
247    }
248
249    if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
250        if actual < min {
251            return Err(AppError::Validation(format!(
252                "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
253            )));
254        }
255    }
256
257    Ok(version)
258}
259
260/// Invokes `claude -p` for a single file and returns the extraction result.
261///
262/// OAuth-only enforcement (gaps.md:41-49, v1.0.69 mandate):
263///
264/// - `wait-timeout` for cross-platform subprocess timeout.
265/// - `env_clear()` for least-privilege environment.
266/// - OAuth-only flow: NO `--bare` (PROHIBITED, gaps.md:49), no API-key path.
267/// - Mandatory hardening: `--strict-mcp-config --mcp-config '{}'` to zero
268///   MCP servers, and `--settings '{"hooks":{}}'` to disable hooks.
269/// - If `ANTHROPIC_API_KEY` is set in the environment we ABORT the spawn
270///   (return a `false` command with a violation marker) — API-key path is
271///   PROHIBITED in this project.
272fn extract_with_claude(
273    binary: &Path,
274    file_content: &[u8],
275    model: Option<&str>,
276    timeout_secs: u64,
277) -> Result<(ExtractionResult, f64, bool), AppError> {
278    use wait_timeout::ChildExt;
279
280    // OAuth-only guard (gaps.md:47). If `ANTHROPIC_API_KEY` is set in the
281    // environment we MUST abort — that is the API-key path which is
282    // explicitly PROHIBITED. Use the OAuth flow exclusively.
283    if let Ok(_key) = std::env::var("ANTHROPIC_API_KEY") {
284        let mut cmd = Command::new("false");
285        cmd.env_clear();
286        cmd.env("PATH", "/nonexistent");
287        cmd.arg("--oauth-only-violation-anthropic-api-key-set");
288        return Err(AppError::Validation(
289            "ANTHROPIC_API_KEY is set in the environment; \
290             sqlite-graphrag operates exclusively with OAuth (Pro/Max) and \
291             the API-key path is PROHIBITED (gaps.md:47). Unset the variable \
292             and re-run with `claude login` already completed in this session."
293                .to_string(),
294        ));
295    }
296
297    let mut cmd = Command::new(binary);
298
299    // v1.0.83 (ADR-0041): env whitelist delegated to the shared helper.
300    // `ANTHROPIC_API_KEY` is INTENTIONALLY ABSENT (defence-in-depth).
301    apply_env_whitelist(&mut cmd, crate::spawn::env_whitelist::is_strict_env_clear());
302    crate::spawn::apply_cwd_isolation(&mut cmd)?;
303
304    // Canonical OAuth-only command line (gaps.md:201-208 + 211-213).
305    // `--bare` is PROHIBITED (gaps.md:49) — never emitted.
306    //
307    // GAP-META-005 (v1.0.87, ADR-0045): `--mcp-config '{}'` inline is
308    // rejected by Claude Code 2.1.177. Substitute the literal for a
309    // tempfile path containing `{"mcpServers":{}}`.
310    let mcp_config_path = crate::spawn::preflight::write_empty_mcp_config_tempfile()?;
311
312    cmd.arg("-p")
313        .arg(EXTRACTION_PROMPT)
314        .arg("--strict-mcp-config")
315        .arg("--mcp-config")
316        .arg(mcp_config_path.as_os_str())
317        .arg("--dangerously-skip-permissions")
318        .arg("--settings")
319        .arg(r#"{"hooks":{}}"#)
320        .arg("--output-format")
321        .arg("json")
322        .arg("--json-schema")
323        .arg(EXTRACTION_SCHEMA)
324        .arg("--max-turns")
325        .arg("7")
326        .arg("--no-session-persistence");
327
328    if let Some(m) = model {
329        cmd.arg("--model").arg(m);
330    }
331
332    cmd.stdin(Stdio::piped())
333        .stdout(Stdio::piped())
334        .stderr(Stdio::piped());
335
336    // GAP-META-005 (v1.0.87, ADR-0045): pre-flight gate runs after argv
337    // is fully built. Validates binary, argv-size, walk-up of `.mcp.json`,
338    // and `CLAUDE_CONFIG_DIR` cleanliness.
339    let argv_refs: Vec<std::ffi::OsString> = cmd.get_args().map(|s| s.to_os_string()).collect();
340    let preflight_args = crate::spawn::preflight::PreFlightArgs {
341        binary_path: binary,
342        argv: &argv_refs,
343        workspace_root: std::path::Path::new("."),
344        mcp_config_inline_json: None,
345        expected_output_bytes: 65_536,
346        spawner_name: "ingest_claude",
347    };
348    if let Err(e) = crate::spawn::preflight::preflight_check(&preflight_args) {
349        // v1.0.88 (BUG-6 fix, ADR-0046): propagate the structured
350        // `PreFlightError` via the `From` impl in `errors.rs` so callers
351        // receive `AppError::PreFlightFailed` (exit 16) instead of a
352        // bare `std::process::exit(16)` that discards the variant name,
353        // tracing context, and PT-BR i18n.
354        return Err(crate::errors::AppError::from(e));
355    }
356
357    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
358        AppError::Io(std::io::Error::new(
359            e.kind(),
360            format!("failed to spawn claude: {e}"),
361        ))
362    })?;
363
364    let stdin_data = file_content.to_vec();
365    let mut child_stdin = child
366        .stdin
367        .take()
368        .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
369    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
370        child_stdin.write_all(&stdin_data)?;
371        drop(child_stdin);
372        Ok(())
373    });
374
375    let start = std::time::Instant::now();
376    let timeout = std::time::Duration::from_secs(timeout_secs);
377    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
378
379    match status {
380        Some(exit_status) => {
381            stdin_thread
382                .join()
383                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
384                .map_err(AppError::Io)?;
385
386            tracing::debug!(
387                target: "process",
388                exit_code = ?exit_status.code(),
389                elapsed_ms = start.elapsed().as_millis() as u64,
390                "external process completed"
391            );
392
393            let mut stdout_buf = Vec::new();
394            let mut stderr_buf = Vec::new();
395            if let Some(mut out) = child.stdout.take() {
396                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
397            }
398            if let Some(mut err) = child.stderr.take() {
399                std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
400            }
401
402            if !exit_status.success() {
403                let stdout_str = String::from_utf8_lossy(&stdout_buf);
404                if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
405                {
406                    if let Some(re) = elements
407                        .iter()
408                        .find(|e| e.r#type.as_deref() == Some("result"))
409                    {
410                        if re.terminal_reason.as_deref() == Some("max_turns") {
411                            tracing::warn!(
412                                target: "ingest",
413                                "extraction hit max_turns limit — hooks may have consumed turns"
414                            );
415                            return Err(AppError::Validation(
416                                "claude -p hit max_turns: hooks may be consuming turns".into(),
417                            ));
418                        }
419                        if re.is_error {
420                            let err_msg = re
421                                .error
422                                .as_deref()
423                                .or(re.result.as_deref())
424                                .unwrap_or("unknown error");
425                            if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
426                                return Err(AppError::RateLimited {
427                                    detail: err_msg.to_string(),
428                                });
429                            }
430                            if err_msg.contains("Not logged in")
431                                || err_msg.contains("authentication")
432                            {
433                                tracing::warn!(
434                                    target: "ingest",
435                                    "Claude Code authentication failed. Re-authenticate interactively with: claude"
436                                );
437                            }
438                            return Err(AppError::Validation(format!(
439                                "claude -p failed: {err_msg}"
440                            )));
441                        }
442                    }
443                }
444                let stderr_str = String::from_utf8_lossy(&stderr_buf);
445                if stderr_str.contains("auth") || stderr_str.contains("login") {
446                    tracing::warn!(
447                        target: "ingest",
448                        "Claude Code authentication may have failed. Re-authenticate with: claude"
449                    );
450                }
451                return Err(AppError::Validation(format!(
452                    "claude -p exited with code {:?}: {}",
453                    exit_status.code(),
454                    stderr_str.trim()
455                )));
456            }
457
458            let stdout = String::from_utf8(stdout_buf)
459                .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
460            parse_claude_output(&stdout)
461        }
462        None => {
463            tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
464            let _ = child.kill();
465            let _ = child.wait();
466            let _ = stdin_thread.join();
467            Err(AppError::Validation(format!(
468                "claude -p timed out after {timeout_secs} seconds"
469            )))
470        }
471    }
472}
473
474/// Parses the JSON array output from `claude -p --output-format json`.
475///
476/// Returns `(extraction, cost_usd, is_oauth)` where `is_oauth` is true when
477/// the init element reports `apiKeySource: "none"` (OAuth subscription).
478fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64, bool), AppError> {
479    let elements: Vec<ClaudeOutputElement> = serde_json::from_str(stdout).map_err(|e| {
480        AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
481    })?;
482
483    let is_oauth = elements
484        .iter()
485        .find(|e| e.r#type.as_deref() == Some("system") && e.subtype.as_deref() == Some("init"))
486        .and_then(|e| e.api_key_source.as_deref())
487        .map(|s| s == "none")
488        .unwrap_or(false);
489
490    let result_elem = elements
491        .iter()
492        .find(|e| e.r#type.as_deref() == Some("result"))
493        .ok_or_else(|| {
494            AppError::Validation("claude output missing 'result' element".to_string())
495        })?;
496
497    if result_elem.is_error {
498        let err_msg = result_elem
499            .error
500            .as_deref()
501            .or(result_elem.result.as_deref())
502            .unwrap_or("unknown error");
503        if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
504            return Err(AppError::RateLimited {
505                detail: err_msg.to_string(),
506            });
507        }
508        return Err(AppError::Validation(format!(
509            "claude extraction failed: {err_msg}"
510        )));
511    }
512
513    let extraction = result_elem
514        .structured_output
515        .clone()
516        .or_else(|| {
517            result_elem
518                .result
519                .as_ref()
520                .and_then(|text| serde_json::from_str::<ExtractionResult>(text).ok())
521        })
522        .ok_or_else(|| {
523            AppError::Validation("claude result missing structured_output and result field".into())
524        })?;
525
526    let cost = result_elem.total_cost_usd.unwrap_or(0.0);
527
528    Ok((extraction, cost, is_oauth))
529}
530
531use crate::output::emit_json_line as emit_json;
532
533/// Collects files matching the pattern (reuses ingest logic).
534fn collect_matching_files(
535    dir: &Path,
536    pattern: &str,
537    recursive: bool,
538    max_files: usize,
539) -> Result<Vec<PathBuf>, AppError> {
540    let mut files = Vec::new();
541    super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
542    files.sort_unstable();
543
544    if files.len() > max_files {
545        return Err(AppError::Validation(format!(
546            "found {} files, exceeds --max-files cap of {}",
547            files.len(),
548            max_files
549        )));
550    }
551
552    Ok(files)
553}
554
555/// Opens or creates the queue database for tracking ingest progress.
556fn open_queue_db(path: &str) -> Result<Connection, AppError> {
557    let conn = Connection::open(path)?;
558
559    conn.pragma_update(None, "journal_mode", "wal")?;
560
561    conn.execute_batch(
562        "CREATE TABLE IF NOT EXISTS queue (
563            id          INTEGER PRIMARY KEY AUTOINCREMENT,
564            file_path   TEXT NOT NULL UNIQUE,
565            name        TEXT,
566            status      TEXT NOT NULL DEFAULT 'pending',
567            memory_id   INTEGER,
568            entities    INTEGER DEFAULT 0,
569            rels        INTEGER DEFAULT 0,
570            error       TEXT,
571            cost_usd    REAL DEFAULT 0.0,
572            attempt     INTEGER DEFAULT 0,
573            elapsed_ms  INTEGER,
574            created_at  TEXT DEFAULT (datetime('now')),
575            done_at     TEXT
576        );
577        CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
578    )?;
579
580    Ok(conn)
581}
582
583/// Main entry point for `ingest --mode claude-code`.
584pub fn run_claude_ingest(args: &IngestArgs) -> Result<(), AppError> {
585    let started = Instant::now();
586
587    if !args.dir.exists() {
588        return Err(AppError::Validation(format!(
589            "directory not found: {}",
590            args.dir.display()
591        )));
592    }
593
594    // G28-B (v1.0.68) + G30 (v1.0.69): acquire singleton before doing real
595    // work so two parallel `ingest --mode claude-code` invocations cannot
596    // co-exist on the same database. Scope includes the database hash so
597    // concurrent ingest against different databases is allowed.
598    let early_ns = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
599    let early_paths = AppPaths::resolve(args.db.as_deref())?;
600    let _singleton = crate::lock::acquire_job_singleton(
601        crate::lock::JobType::IngestClaudeCode,
602        &early_ns,
603        &early_paths.db,
604        args.wait_job_singleton,
605        args.force_job_singleton,
606    )?;
607
608    // Stage 1: Validate
609    let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
610    let version = validate_claude_version(&claude_binary)?;
611    tracing::info!(
612        target: "ingest",
613        binary = %claude_binary.display(),
614        version = %version,
615        "Claude Code binary validated"
616    );
617
618    emit_json(&PhaseEvent {
619        phase: "validate",
620        claude_path: claude_binary.to_str(),
621        version: Some(&version),
622        dir: None,
623        files_total: None,
624        files_new: None,
625        files_existing: None,
626    });
627
628    // Stage 2: Scan
629    let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
630
631    let queue_conn = open_queue_db(&args.queue_db)?;
632
633    if args.resume {
634        let reset = queue_conn
635            .execute(
636                "UPDATE queue SET status='pending' WHERE status='processing'",
637                [],
638            )
639            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
640        if reset > 0 {
641            tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
642        }
643    }
644
645    if args.retry_failed {
646        let count = queue_conn
647            .execute(
648                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
649                [],
650            )
651            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
652        tracing::info!(target: "ingest", count, "retrying failed files");
653    }
654
655    if !args.resume && !args.retry_failed {
656        queue_conn
657            .execute("DELETE FROM queue", [])
658            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
659    }
660
661    let mut new_count = 0usize;
662    let mut existing_count = 0usize;
663
664    if !args.retry_failed {
665        for file in &files {
666            let file_str = file.to_string_lossy().into_owned();
667            let inserted = queue_conn
668                .execute(
669                    "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
670                    rusqlite::params![file_str],
671                )
672                .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
673            if inserted > 0 {
674                new_count += 1;
675            } else {
676                existing_count += 1;
677            }
678        }
679    }
680
681    emit_json(&PhaseEvent {
682        phase: "scan",
683        claude_path: None,
684        version: None,
685        dir: args.dir.to_str(),
686        files_total: Some(files.len()),
687        files_new: Some(new_count),
688        files_existing: Some(existing_count),
689    });
690
691    if args.dry_run {
692        for (idx, file) in files.iter().enumerate() {
693            let (name, _truncated, _orig) =
694                super::ingest::derive_kebab_name(file, args.max_name_length);
695            emit_json(&FileEvent {
696                file: &file.to_string_lossy(),
697                name: &name,
698                status: "preview",
699                memory_id: None,
700                entities: None,
701                rels: None,
702                cost_usd: None,
703                elapsed_ms: None,
704                error: None,
705                index: idx,
706                total: files.len(),
707            });
708        }
709        emit_json(&Summary {
710            summary: true,
711            files_total: files.len(),
712            completed: 0,
713            failed: 0,
714            skipped: 0,
715            entities_total: 0,
716            rels_total: 0,
717            cost_usd: 0.0,
718            elapsed_ms: started.elapsed().as_millis() as u64,
719        });
720        if !args.keep_queue {
721            let _ = std::fs::remove_file(&args.queue_db);
722        }
723        return Ok(());
724    }
725
726    // Stage 3: Process
727    let paths = AppPaths::resolve(args.db.as_deref())?;
728    ensure_db_ready(&paths)?;
729    let conn = open_rw(&paths.db)?;
730
731    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
732    let memory_type_str = args.r#type.as_str().to_string();
733
734    let mut completed = 0usize;
735    let mut failed = 0usize;
736    let skipped_initial: usize = queue_conn
737        .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
738            r.get::<_, usize>(0)
739        })
740        .unwrap_or(0);
741    let mut skipped = skipped_initial;
742    let mut entities_total = 0usize;
743    let mut rels_total = 0usize;
744    let mut cost_total = 0.0f64;
745    let mut oauth_detected = false;
746    let total = files.len();
747
748    let mut backoff_secs = args.rate_limit_wait;
749    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
750
751    loop {
752        if crate::shutdown_requested() {
753            tracing::info!(target: "ingest", "shutdown requested, stopping before next file");
754            break;
755        }
756
757        let pending: Option<(i64, String)> = queue_conn
758            .query_row(
759                "UPDATE queue SET status='processing', attempt=attempt+1 \
760                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
761                 RETURNING id, file_path",
762                [],
763                |row| Ok((row.get(0)?, row.get(1)?)),
764            )
765            .ok();
766
767        let (queue_id, file_path) = match pending {
768            Some(p) => p,
769            None => break,
770        };
771
772        let file_started = Instant::now();
773
774        // G05: reject files that exceed the 10 MB stdin limit
775        const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
776        if let Ok(meta) = std::fs::metadata(&file_path) {
777            if meta.len() > MAX_FILE_SIZE {
778                let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
779                let _ = queue_conn.execute(
780                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
781                    rusqlite::params![err_msg, queue_id],
782                );
783                let current_index = completed + failed + skipped;
784                failed += 1;
785                emit_json(&FileEvent {
786                    file: &file_path,
787                    name: "",
788                    status: "failed",
789                    memory_id: None,
790                    entities: None,
791                    rels: None,
792                    cost_usd: None,
793                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
794                    error: Some(&err_msg),
795                    index: current_index,
796                    total,
797                });
798                if args.fail_fast {
799                    break;
800                }
801                continue;
802            }
803        }
804
805        let file_content = match std::fs::read(&file_path) {
806            Ok(c) => c,
807            Err(e) => {
808                let err_msg = format!("IO error: {e}");
809                let _ = queue_conn.execute(
810                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
811                    rusqlite::params![err_msg, queue_id],
812                );
813                let current_index = completed + failed + skipped;
814                failed += 1;
815                emit_json(&FileEvent {
816                    file: &file_path,
817                    name: "",
818                    status: "failed",
819                    memory_id: None,
820                    entities: None,
821                    rels: None,
822                    cost_usd: None,
823                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
824                    error: Some(&err_msg),
825                    index: current_index,
826                    total,
827                });
828                if args.fail_fast {
829                    break;
830                }
831                continue;
832            }
833        };
834
835        // B08: skip files exceeding body cap BEFORE sending to LLM to avoid wasting tokens
836        if file_content.len() > crate::constants::MAX_MEMORY_BODY_LEN {
837            let err_msg = format!(
838                "file body exceeds {} byte limit ({} bytes) — skipping to avoid wasting LLM tokens",
839                crate::constants::MAX_MEMORY_BODY_LEN,
840                file_content.len()
841            );
842            tracing::warn!(target: "ingest", file = %file_path, size = file_content.len(), "body exceeds limit, skipping LLM extraction");
843            let _ = queue_conn.execute(
844                "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
845                rusqlite::params![err_msg, queue_id],
846            );
847            let current_index = completed + failed + skipped;
848            skipped += 1;
849            emit_json(&FileEvent {
850                file: &file_path,
851                name: "",
852                status: "skipped",
853                memory_id: None,
854                entities: None,
855                rels: None,
856                cost_usd: None,
857                elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
858                error: Some(&err_msg),
859                index: current_index,
860                total,
861            });
862            continue;
863        }
864
865        // B07: retry once on cold-start failure (Claude Code Issue #23265)
866        let max_extract_attempts: u32 = 2;
867        let mut extraction_result: Option<(ExtractionResult, f64, bool)> = None;
868        let mut last_extract_err: Option<String> = None;
869        let mut last_was_rate_limited = false;
870
871        for attempt in 1..=max_extract_attempts {
872            match extract_with_claude(
873                &claude_binary,
874                &file_content,
875                args.claude_model.as_deref(),
876                args.claude_timeout,
877            ) {
878                Ok(result) => {
879                    extraction_result = Some(result);
880                    break;
881                }
882                Err(ref e) if matches!(e, AppError::RateLimited { .. }) => {
883                    last_extract_err = Some(format!("{e}"));
884                    last_was_rate_limited = true;
885                    break;
886                }
887                Err(e) => {
888                    let msg = format!("{e}");
889                    if attempt < max_extract_attempts {
890                        let cold_start_delay = 2 * attempt as u64;
891                        tracing::warn!(target: "ingest", attempt, delay_secs = cold_start_delay, error = %msg, "extraction failed, retrying (cold-start workaround)");
892                        std::thread::sleep(std::time::Duration::from_secs(cold_start_delay));
893                    }
894                    last_extract_err = Some(msg);
895                }
896            }
897        }
898
899        if let Some((extraction, cost, is_oauth)) = extraction_result {
900            if is_oauth && !oauth_detected {
901                oauth_detected = true;
902                tracing::info!(target: "ingest", "OAuth subscription detected — cost_usd omitted from output");
903            }
904            backoff_secs = args.rate_limit_wait;
905
906            let (normalized_name, _truncated, _orig) = crate::commands::ingest::derive_kebab_name(
907                std::path::Path::new(&extraction.name),
908                args.max_name_length,
909            );
910            let name = &normalized_name;
911            let ent_count = extraction.entities.len();
912            let rel_count = 0;
913
914            let new_entities: Vec<NewEntity> = extraction
915                .entities
916                .iter()
917                .filter_map(|e| match e.entity_type.parse::<EntityType>() {
918                    Ok(et) => Some(NewEntity {
919                        name: e.name.clone(),
920                        entity_type: et,
921                        description: None,
922                    }),
923                    Err(_) => {
924                        tracing::warn!(
925                            target: "ingest",
926                            entity = %e.name,
927                            entity_type = %e.entity_type,
928                            "entity type not recognized, skipping"
929                        );
930                        None
931                    }
932                })
933                .collect();
934
935            let new_relationships: Vec<NewRelationship> = extraction
936                .relationships
937                .iter()
938                .map(|r| NewRelationship {
939                    source: r.source.clone(),
940                    target: r.target.clone(),
941                    relation: crate::parsers::normalize_relation(&r.relation),
942                    strength: r.strength,
943                    description: None,
944                })
945                .collect();
946
947            let body_str = String::from_utf8(file_content.clone())
948                .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
949            let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
950            let new_memory = NewMemory {
951                name: name.clone(),
952                namespace: namespace.clone(),
953                memory_type: memory_type_str.clone(),
954                description: extraction.description.clone(),
955                body: body_str.to_string(),
956                body_hash,
957                session_id: None,
958                source: "agent".to_string(),
959                metadata: serde_json::Value::Object(serde_json::Map::new()),
960            };
961
962            // B06: deduplication — update existing memory instead of failing on UNIQUE
963            let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
964                Some((existing_id, is_deleted)) => {
965                    if is_deleted {
966                        memories::clear_deleted_at(&conn, existing_id)?;
967                    }
968                    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
969                        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
970                        rusqlite::params![existing_id],
971                        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
972                    )?;
973                    memories::update(&conn, existing_id, &new_memory, None)?;
974                    memories::sync_fts_after_update(
975                        &conn,
976                        existing_id,
977                        &old_name,
978                        &old_desc,
979                        &old_body,
980                        &new_memory.name,
981                        &new_memory.description,
982                        &new_memory.body,
983                    )?;
984                    tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
985                    existing_id
986                }
987                None => match memories::insert(&conn, &new_memory) {
988                    Ok(id) => id,
989                    Err(e) => {
990                        let err_msg = format!("{e}");
991                        let _ = queue_conn.execute(
992                                "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
993                                rusqlite::params![err_msg, queue_id],
994                            );
995                        let current_index = completed + failed + skipped;
996                        failed += 1;
997                        emit_json(&FileEvent {
998                            file: &file_path,
999                            name,
1000                            status: "failed",
1001                            memory_id: None,
1002                            entities: None,
1003                            rels: None,
1004                            cost_usd: if is_oauth { None } else { Some(cost) },
1005                            elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1006                            error: Some(&err_msg),
1007                            index: current_index,
1008                            total,
1009                        });
1010                        if !is_oauth {
1011                            cost_total += cost;
1012                        }
1013                        if args.fail_fast {
1014                            break;
1015                        }
1016                        continue;
1017                    }
1018                },
1019            };
1020
1021            for ent in &new_entities {
1022                match entities::upsert_entity(&conn, &namespace, ent) {
1023                    Ok(eid) => {
1024                        let _ = entities::link_memory_entity(&conn, memory_id, eid);
1025                    }
1026                    Err(e) => {
1027                        tracing::warn!(
1028                            target: "ingest",
1029                            entity = %ent.name,
1030                            error = %e,
1031                            "entity skipped due to validation error"
1032                        );
1033                    }
1034                }
1035            }
1036            for rel in &new_relationships {
1037                crate::parsers::warn_if_non_canonical(&rel.relation);
1038                let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
1039                let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
1040                if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1041                    let _ = conn.execute(
1042                        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
1043                        rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
1044                    );
1045                }
1046            }
1047
1048            // G01: embedding pipeline — enables recall to find memories created via --mode claude-code
1049            let body_text = String::from_utf8(file_content.clone())
1050                .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
1051            let snippet: String = body_text.chars().take(200).collect();
1052            let chunks_info = crate::chunking::split_into_chunks_hierarchical(&body_text);
1053
1054            // v1.0.89 (GAP-EMBED-PROPAGATION): honour --llm-backend via embed_passage_with_choice.
1055            let embedding_result = if chunks_info.len() <= 1 {
1056                crate::embedder::embed_passage_with_choice(&paths.models, &body_text, None)
1057                    .map(|(v, _)| v)
1058            } else {
1059                let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
1060                let mut multi_ok = true;
1061                for chunk in &chunks_info {
1062                    let chunk_text = crate::chunking::chunk_text(&body_text, chunk);
1063                    match crate::embedder::embed_passage_with_choice(
1064                        &paths.models,
1065                        chunk_text,
1066                        None,
1067                    )
1068                    .map(|(v, _)| v)
1069                    {
1070                        Ok(emb) => chunk_embeddings.push(emb),
1071                        Err(e) => {
1072                            tracing::warn!(
1073                                target: "ingest",
1074                                file = %file_path,
1075                                error = %e,
1076                                "chunk embedding failed, skipping vector index for this file"
1077                            );
1078                            multi_ok = false;
1079                            break;
1080                        }
1081                    }
1082                }
1083                if multi_ok {
1084                    let aggregated = crate::chunking::aggregate_embeddings(&chunk_embeddings);
1085                    // persist per-chunk vectors
1086                    if let Err(e) = crate::storage::chunks::insert_chunk_slices(
1087                        &conn,
1088                        memory_id,
1089                        &body_text,
1090                        &chunks_info,
1091                    ) {
1092                        tracing::warn!(
1093                            target: "ingest",
1094                            file = %file_path,
1095                            error = %e,
1096                            "chunk slice insert failed"
1097                        );
1098                    } else {
1099                        for (i, emb) in chunk_embeddings.iter().enumerate() {
1100                            if let Err(e) = crate::storage::chunks::upsert_chunk_vec(
1101                                &conn, i as i64, memory_id, i as i32, emb,
1102                            ) {
1103                                tracing::warn!(
1104                                    target: "ingest",
1105                                    file = %file_path,
1106                                    chunk = i,
1107                                    error = %e,
1108                                    "chunk vec upsert failed"
1109                                );
1110                            }
1111                        }
1112                    }
1113                    Ok(aggregated)
1114                } else {
1115                    // fallback: embed whole body for the memory-level vector
1116                    crate::embedder::embed_passage_with_choice(&paths.models, &body_text, None)
1117                        .map(|(v, _)| v)
1118                }
1119            };
1120
1121            match embedding_result {
1122                Ok(embedding) => {
1123                    if let Err(e) = memories::upsert_vec(
1124                        &conn,
1125                        memory_id,
1126                        &namespace,
1127                        &memory_type_str,
1128                        &embedding,
1129                        name,
1130                        &snippet,
1131                    ) {
1132                        tracing::warn!(
1133                            target: "ingest",
1134                            file = %file_path,
1135                            error = %e,
1136                            "memory vec upsert failed; recall may not find this memory"
1137                        );
1138                    }
1139                    // embed each entity that was successfully upserted
1140                    for ent in &new_entities {
1141                        if let Ok(Some(eid)) =
1142                            entities::find_entity_id(&conn, &namespace, &ent.name)
1143                        {
1144                            let entity_text = ent.name.clone();
1145                            match crate::embedder::embed_passage_with_choice(
1146                                &paths.models,
1147                                &entity_text,
1148                                None,
1149                            )
1150                            .map(|(v, _)| v)
1151                            {
1152                                Ok(emb) => {
1153                                    if let Err(e) = entities::upsert_entity_vec(
1154                                        &conn,
1155                                        eid,
1156                                        &namespace,
1157                                        ent.entity_type,
1158                                        &emb,
1159                                        &ent.name,
1160                                    ) {
1161                                        tracing::warn!(
1162                                            target: "ingest",
1163                                            entity = %ent.name,
1164                                            error = %e,
1165                                            "entity vec upsert failed"
1166                                        );
1167                                    }
1168                                }
1169                                Err(e) => {
1170                                    tracing::warn!(
1171                                        target: "ingest",
1172                                        entity = %ent.name,
1173                                        error = %e,
1174                                        "entity embedding failed"
1175                                    );
1176                                }
1177                            }
1178                        }
1179                    }
1180                }
1181                Err(e) => {
1182                    tracing::warn!(
1183                        target: "ingest",
1184                        file = %file_path,
1185                        error = %e,
1186                        "memory embedding failed; recall will not find this memory"
1187                    );
1188                }
1189            }
1190
1191            let _ = queue_conn.execute(
1192                "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1193                rusqlite::params![
1194                    name,
1195                    memory_id,
1196                    ent_count,
1197                    rel_count,
1198                    cost,
1199                    file_started.elapsed().as_millis() as i64,
1200                    queue_id
1201                ],
1202            );
1203
1204            let current_index = completed + failed + skipped;
1205            completed += 1;
1206            entities_total += ent_count;
1207            rels_total += rel_count;
1208            if !is_oauth {
1209                cost_total += cost;
1210            }
1211
1212            emit_json(&FileEvent {
1213                file: &file_path,
1214                name,
1215                status: "done",
1216                memory_id: Some(memory_id),
1217                entities: Some(ent_count),
1218                rels: Some(rel_count),
1219                cost_usd: if is_oauth { None } else { Some(cost) },
1220                elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1221                error: None,
1222                index: current_index,
1223                total,
1224            });
1225        } else if let Some(ref err_str) = last_extract_err {
1226            if last_was_rate_limited {
1227                if crate::retry::is_kill_switch_active() {
1228                    tracing::warn!(target: "ingest", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1229                } else if std::time::Instant::now() >= rate_limit_deadline {
1230                    tracing::error!(target: "ingest", "rate-limit retry deadline (1h) exhausted");
1231                } else {
1232                    let half = backoff_secs / 2;
1233                    let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1234                    let actual_wait = half + jitter;
1235                    tracing::warn!(target: "ingest", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
1236                    let _ = queue_conn.execute(
1237                        "UPDATE queue SET status='pending' WHERE id=?1",
1238                        rusqlite::params![queue_id],
1239                    );
1240                    std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1241                    backoff_secs = (backoff_secs * 2).min(900);
1242                    continue;
1243                }
1244            } else {
1245                let _ = queue_conn.execute(
1246                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1247                    rusqlite::params![err_str, queue_id],
1248                );
1249                let current_index = completed + failed + skipped;
1250                failed += 1;
1251                emit_json(&FileEvent {
1252                    file: &file_path,
1253                    name: "",
1254                    status: "failed",
1255                    memory_id: None,
1256                    entities: None,
1257                    rels: None,
1258                    cost_usd: None,
1259                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1260                    error: Some(err_str),
1261                    index: current_index,
1262                    total,
1263                });
1264                if args.fail_fast {
1265                    break;
1266                }
1267            }
1268        }
1269
1270        if let Some(budget) = args.max_cost_usd {
1271            if oauth_detected {
1272                tracing::debug!(target: "ingest", "--max-cost-usd ignored: OAuth subscription detected");
1273            } else if cost_total >= budget {
1274                tracing::warn!(
1275                    target: "ingest",
1276                    spent = cost_total,
1277                    budget = budget,
1278                    "budget exceeded, stopping"
1279                );
1280                break;
1281            }
1282        }
1283    }
1284
1285    // Stage 4: Summary
1286    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1287    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1288
1289    emit_json(&Summary {
1290        summary: true,
1291        files_total: total,
1292        completed,
1293        failed,
1294        skipped,
1295        entities_total,
1296        rels_total,
1297        cost_usd: cost_total,
1298        elapsed_ms: started.elapsed().as_millis() as u64,
1299    });
1300
1301    if !args.keep_queue && failed == 0 {
1302        let _ = std::fs::remove_file(&args.queue_db);
1303    }
1304
1305    Ok(())
1306}
1307
1308#[cfg(test)]
1309mod tests {
1310    use super::*;
1311
1312    #[test]
1313    fn test_extraction_schema_valid_json() {
1314        let _: serde_json::Value =
1315            serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
1316    }
1317
1318    #[test]
1319    fn test_parse_claude_output_valid() {
1320        let output = r#"[
1321            {"type":"system","subtype":"init"},
1322            {"type":"assistant"},
1323            {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
1324        ]"#;
1325        let (result, cost, _is_oauth) = parse_claude_output(output).expect("parse must succeed");
1326        assert_eq!(result.name, "test-doc");
1327        assert_eq!(result.entities.len(), 1);
1328        assert_eq!(result.relationships.len(), 1);
1329        assert!((cost - 0.02).abs() < f64::EPSILON);
1330    }
1331
1332    #[test]
1333    fn test_parse_claude_output_error() {
1334        let output = r#"[
1335            {"type":"system","subtype":"init"},
1336            {"type":"result","is_error":true,"error":"authentication failed"}
1337        ]"#;
1338        let err = parse_claude_output(output).unwrap_err();
1339        assert!(format!("{err}").contains("authentication failed"));
1340    }
1341
1342    #[test]
1343    fn test_parse_claude_output_rate_limit() {
1344        let output = r#"[
1345            {"type":"system","subtype":"init"},
1346            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
1347        ]"#;
1348        let err = parse_claude_output(output).unwrap_err();
1349        assert!(matches!(err, AppError::RateLimited { .. }));
1350    }
1351
1352    #[test]
1353    fn test_parse_claude_output_malformed() {
1354        let output = "not json at all";
1355        assert!(parse_claude_output(output).is_err());
1356    }
1357
1358    #[test]
1359    fn test_find_claude_binary_not_found() {
1360        let original_path = std::env::var_os("PATH");
1361        std::env::set_var("PATH", "/nonexistent");
1362        std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
1363        let result = find_claude_binary(None);
1364        if let Some(p) = original_path {
1365            std::env::set_var("PATH", p);
1366        }
1367        assert!(result.is_err());
1368    }
1369
1370    #[test]
1371    fn test_parse_claude_output_result_fallback() {
1372        let output = r#"[
1373            {"type":"system","subtype":"init"},
1374            {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1375        ]"#;
1376        let (result, cost, _is_oauth) =
1377            parse_claude_output(output).expect("result fallback must work");
1378        assert_eq!(result.name, "test-fallback");
1379        assert_eq!(result.entities.len(), 1);
1380        assert!(result.relationships.is_empty());
1381        assert!((cost - 0.01).abs() < f64::EPSILON);
1382    }
1383
1384    #[test]
1385    fn test_parse_claude_output_error_with_result_field() {
1386        let output = r#"[
1387            {"type":"system","subtype":"init"},
1388            {"type":"result","is_error":true,"result":"Not logged in · Please run /login"}
1389        ]"#;
1390        let err = parse_claude_output(output).unwrap_err();
1391        let msg = format!("{err}");
1392        assert!(
1393            msg.contains("Not logged in"),
1394            "expected 'Not logged in' in: {msg}"
1395        );
1396    }
1397
1398    #[test]
1399    fn test_terminal_reason_max_turns_detected() {
1400        let output = r#"[
1401            {"type":"system","subtype":"init"},
1402            {"type":"result","is_error":false,"terminal_reason":"max_turns","structured_output":{"name":"t","description":"d","entities":[],"relationships":[]}}
1403        ]"#;
1404        let err_or_ok = parse_claude_output(output);
1405        assert!(
1406            err_or_ok.is_ok(),
1407            "max_turns in result without is_error should still parse"
1408        );
1409    }
1410
1411    #[test]
1412    fn test_detect_oauth_from_init_json() {
1413        let output = r#"[
1414            {"type":"system","subtype":"init","apiKeySource":"none"},
1415            {"type":"result","is_error":false,"total_cost_usd":0.50,"structured_output":{"name":"test-oauth","description":"oauth test","entities":[],"relationships":[]}}
1416        ]"#;
1417        let (_result, cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1418        assert!(is_oauth, "apiKeySource=none must be detected as OAuth");
1419        assert!((cost - 0.50).abs() < f64::EPSILON);
1420    }
1421
1422    #[test]
1423    fn test_api_key_source_not_oauth() {
1424        let output = r#"[
1425            {"type":"system","subtype":"init","apiKeySource":"env"},
1426            {"type":"result","is_error":false,"total_cost_usd":0.10,"structured_output":{"name":"test-api","description":"api test","entities":[],"relationships":[]}}
1427        ]"#;
1428        let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1429        assert!(!is_oauth, "apiKeySource=env must NOT be detected as OAuth");
1430    }
1431
1432    #[test]
1433    fn test_missing_api_key_source_defaults_not_oauth() {
1434        let output = r#"[
1435            {"type":"system","subtype":"init"},
1436            {"type":"result","is_error":false,"total_cost_usd":0.05,"structured_output":{"name":"test-missing","description":"missing test","entities":[],"relationships":[]}}
1437        ]"#;
1438        let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1439        assert!(!is_oauth, "missing apiKeySource must default to not OAuth");
1440    }
1441
1442    #[test]
1443    fn test_extraction_schema_entity_types_match_enum() {
1444        let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1445        let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1446            .as_array()
1447            .expect("schema must have entity_type enum");
1448        for t in types {
1449            let s = t.as_str().unwrap();
1450            assert!(
1451                s.parse::<EntityType>().is_ok(),
1452                "schema entity_type '{s}' not in EntityType enum"
1453            );
1454        }
1455    }
1456}