Skip to main content

sqlite_graphrag/commands/
ingest_claude.rs

1//! Handler for `ingest --mode claude-code`.
2//!
3//! Orchestrates the locally installed Claude Code CLI binary (`claude -p`)
4//! to extract domain-specific entities and relationships from each file,
5//! then persists them via the same pipeline as `remember --graph-stdin`.
6//!
7//! Architecture: P1 One-Shot per file — each file spawns a separate
8//! `claude -p` process with `--json-schema` for guaranteed structured output.
9//! A SQLite queue DB tracks progress for resume/retry support.
10// Workload: Subprocess I/O-bound (claude -p headless with network wait)
11
12use crate::commands::ingest::IngestArgs;
13use crate::entity_type::EntityType;
14use crate::errors::AppError;
15use crate::paths::AppPaths;
16use crate::spawn::env_whitelist::apply_env_whitelist;
17use crate::storage::connection::{ensure_db_ready, open_rw};
18use crate::storage::entities::{self, NewEntity, NewRelationship};
19use crate::storage::memories::{self, NewMemory};
20
21use rusqlite::Connection;
22use serde::{Deserialize, Serialize};
23use std::io::Write;
24use std::path::{Path, PathBuf};
25use std::process::{Command, Stdio};
26use std::time::Instant;
27
28const MIN_CLAUDE_VERSION: &str = "2.1.0";
29
30const EXTRACTION_SCHEMA: &str = r#"{
31  "type": "object",
32  "properties": {
33    "name": { "type": "string" },
34    "description": { "type": "string" },
35    "entities": {
36      "type": "array",
37      "items": {
38        "type": "object",
39        "properties": {
40          "name": { "type": "string" },
41          "entity_type": {
42            "type": "string",
43            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
44          }
45        },
46        "required": ["name", "entity_type"],
47        "additionalProperties": false
48      }
49    },
50    "relationships": {
51      "type": "array",
52      "items": {
53        "type": "object",
54        "properties": {
55          "source": { "type": "string" },
56          "target": { "type": "string" },
57          "relation": {
58            "type": "string",
59            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
60          },
61          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
62        },
63        "required": ["source","target","relation","strength"],
64        "additionalProperties": false
65      }
66    }
67  },
68  "required": ["name","description","entities","relationships"],
69  "additionalProperties": false
70}"#;
71
72const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
731. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
742. A one-sentence description (10-20 words) summarizing the key insight\n\
753. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
764. Typed relationships between entities with strength scores\n\n\
77Rules:\n\
78- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
79- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
80- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
81- NEVER use 'mentions' as relationship type\n\
82- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
83- Prefer fewer high-quality entities over many low-quality ones\n\
84- Description must answer: What is this about and WHY does it matter?";
85
86#[derive(Debug, Deserialize)]
87struct ClaudeOutputElement {
88    r#type: Option<String>,
89    #[serde(default)]
90    is_error: bool,
91    result: Option<String>,
92    error: Option<String>,
93    terminal_reason: Option<String>,
94}
95
96#[derive(Debug, Clone, Deserialize, Serialize)]
97pub struct ExtractionResult {
98    pub name: String,
99    pub description: String,
100    pub entities: Vec<ExtractedEntity>,
101    pub relationships: Vec<ExtractedRelationship>,
102}
103
104#[derive(Debug, Clone, Deserialize, Serialize)]
105pub struct ExtractedEntity {
106    pub name: String,
107    pub entity_type: String,
108}
109
110#[derive(Debug, Clone, Deserialize, Serialize)]
111pub struct ExtractedRelationship {
112    pub source: String,
113    pub target: String,
114    pub relation: String,
115    pub strength: f64,
116}
117
118#[derive(Debug, Serialize)]
119struct PhaseEvent<'a> {
120    phase: &'a str,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    claude_path: Option<&'a str>,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    version: Option<&'a str>,
125    #[serde(skip_serializing_if = "Option::is_none")]
126    dir: Option<&'a str>,
127    #[serde(skip_serializing_if = "Option::is_none")]
128    files_total: Option<usize>,
129    #[serde(skip_serializing_if = "Option::is_none")]
130    files_new: Option<usize>,
131    #[serde(skip_serializing_if = "Option::is_none")]
132    files_existing: Option<usize>,
133}
134
135#[derive(Debug, Serialize)]
136struct FileEvent<'a> {
137    file: &'a str,
138    name: &'a str,
139    status: &'a str,
140    #[serde(skip_serializing_if = "Option::is_none")]
141    memory_id: Option<i64>,
142    #[serde(skip_serializing_if = "Option::is_none")]
143    entities: Option<usize>,
144    #[serde(skip_serializing_if = "Option::is_none")]
145    rels: Option<usize>,
146    #[serde(skip_serializing_if = "Option::is_none")]
147    cost_usd: Option<f64>,
148    #[serde(skip_serializing_if = "Option::is_none")]
149    elapsed_ms: Option<u64>,
150    #[serde(skip_serializing_if = "Option::is_none")]
151    error: Option<&'a str>,
152    index: usize,
153    total: usize,
154}
155
156#[derive(Debug, Serialize)]
157struct Summary {
158    summary: bool,
159    files_total: usize,
160    completed: usize,
161    failed: usize,
162    skipped: usize,
163    entities_total: usize,
164    rels_total: usize,
165    cost_usd: f64,
166    elapsed_ms: u64,
167}
168
169/// Locates the Claude Code binary on the system.
170pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
171    if let Some(p) = explicit {
172        if p.exists() {
173            return Ok(p.to_path_buf());
174        }
175        return Err(AppError::Validation(format!(
176            "Claude Code binary not found at explicit path: {}",
177            p.display()
178        )));
179    }
180
181    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
182        let p = PathBuf::from(&env_path);
183        if p.exists() {
184            return Ok(p);
185        }
186    }
187
188    let name = if cfg!(windows) {
189        "claude.exe"
190    } else {
191        "claude"
192    };
193    if let Some(path_var) = std::env::var_os("PATH") {
194        for dir in std::env::split_paths(&path_var) {
195            let candidate = dir.join(name);
196            if candidate.exists() {
197                return Ok(candidate);
198            }
199        }
200    }
201
202    Err(AppError::Validation(
203        "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
204    ))
205}
206
207/// Validates that the Claude Code binary meets the minimum version.
208fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
209    let output = Command::new(binary)
210        .arg("--version")
211        .stdin(Stdio::null())
212        .stdout(Stdio::piped())
213        .stderr(Stdio::piped())
214        .output()
215        .map_err(AppError::Io)?;
216
217    if !output.status.success() {
218        return Err(AppError::Validation(
219            "failed to run 'claude --version'".to_string(),
220        ));
221    }
222
223    let version_str = String::from_utf8(output.stdout)
224        .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
225    let version = version_str.trim().to_string();
226
227    // Extract the numeric version part before first space or paren, e.g. "2.1.149 (Claude Code)" -> "2.1.149"
228    let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
229
230    fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
231        let parts: Vec<&str> = s.splitn(3, '.').collect();
232        if parts.len() < 2 {
233            return None;
234        }
235        let major = parts[0].parse::<u64>().ok()?;
236        let minor = parts[1].parse::<u64>().ok()?;
237        let patch = parts
238            .get(2)
239            .and_then(|p| p.parse::<u64>().ok())
240            .unwrap_or(0);
241        Some((major, minor, patch))
242    }
243
244    if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
245        if actual < min {
246            return Err(AppError::Validation(format!(
247                "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
248            )));
249        }
250    }
251
252    Ok(version)
253}
254
255/// Invokes `claude -p` for a single file and returns the extraction result.
256///
257/// OAuth-only enforcement (gaps.md:41-49, v1.0.69 mandate):
258///
259/// - `wait-timeout` for cross-platform subprocess timeout.
260/// - `env_clear()` for least-privilege environment.
261/// - OAuth-only flow: NO `--bare` (PROHIBITED, gaps.md:49), no API-key path.
262/// - Mandatory hardening: `--strict-mcp-config --mcp-config '{}'` to zero
263///   MCP servers, and `--settings '{"hooks":{}}'` to disable hooks.
264/// - If `ANTHROPIC_API_KEY` is set in the environment we ABORT the spawn
265///   (return a `false` command with a violation marker) — API-key path is
266///   PROHIBITED in this project.
267fn extract_with_claude(
268    binary: &Path,
269    file_content: &[u8],
270    model: Option<&str>,
271    timeout_secs: u64,
272) -> Result<(ExtractionResult, f64, bool), AppError> {
273    use wait_timeout::ChildExt;
274
275    // OAuth-only guard (gaps.md:47). If `ANTHROPIC_API_KEY` is set in the
276    // environment we MUST abort — that is the API-key path which is
277    // explicitly PROHIBITED. Use the OAuth flow exclusively.
278    if let Ok(_key) = std::env::var("ANTHROPIC_API_KEY") {
279        let mut cmd = Command::new("false");
280        cmd.env_clear();
281        cmd.env("PATH", "/nonexistent");
282        cmd.arg("--oauth-only-violation-anthropic-api-key-set");
283        return Err(AppError::Validation(
284            "ANTHROPIC_API_KEY is set in the environment; \
285             sqlite-graphrag operates exclusively with OAuth (Pro/Max) and \
286             the API-key path is PROHIBITED (gaps.md:47). Unset the variable \
287             and re-run with `claude login` already completed in this session."
288                .to_string(),
289        ));
290    }
291
292    let mut cmd = Command::new(binary);
293
294    // v1.0.83 (ADR-0041): env whitelist delegated to the shared helper.
295    // `ANTHROPIC_API_KEY` is INTENTIONALLY ABSENT (defence-in-depth).
296    apply_env_whitelist(&mut cmd, crate::spawn::env_whitelist::is_strict_env_clear());
297    crate::spawn::apply_cwd_isolation(&mut cmd)?;
298
299    // Canonical OAuth-only command line (gaps.md:201-208 + 211-213).
300    // `--bare` is PROHIBITED (gaps.md:49) — never emitted.
301    //
302    // GAP-META-005 (v1.0.87, ADR-0045): `--mcp-config '{}'` inline is
303    // rejected by Claude Code 2.1.177. Substitute the literal for a
304    // tempfile path containing `{"mcpServers":{}}`.
305    let mcp_config_path = crate::spawn::preflight::write_empty_mcp_config_tempfile()?;
306
307    cmd.arg("-p")
308        .arg(EXTRACTION_PROMPT)
309        .arg("--strict-mcp-config")
310        .arg("--mcp-config")
311        .arg(mcp_config_path.as_os_str())
312        .arg("--dangerously-skip-permissions")
313        .arg("--settings")
314        .arg(r#"{"hooks":{}}"#)
315        .arg("--output-format")
316        .arg("json")
317        .arg("--json-schema")
318        .arg(EXTRACTION_SCHEMA)
319        .arg("--max-turns")
320        .arg("7")
321        .arg("--no-session-persistence");
322
323    if let Some(m) = model {
324        cmd.arg("--model").arg(m);
325    }
326
327    cmd.stdin(Stdio::piped())
328        .stdout(Stdio::piped())
329        .stderr(Stdio::piped());
330
331    // GAP-META-005 (v1.0.87, ADR-0045): pre-flight gate runs after argv
332    // is fully built. Validates binary, argv-size, walk-up of `.mcp.json`,
333    // and `CLAUDE_CONFIG_DIR` cleanliness.
334    let argv_refs: Vec<std::ffi::OsString> = cmd.get_args().map(|s| s.to_os_string()).collect();
335    let preflight_args = crate::spawn::preflight::PreFlightArgs {
336        binary_path: binary,
337        argv: &argv_refs,
338        workspace_root: std::path::Path::new("."),
339        mcp_config_inline_json: None,
340        expected_output_bytes: 65_536,
341        spawner_name: "ingest_claude",
342    };
343    if let Err(e) = crate::spawn::preflight::preflight_check(&preflight_args) {
344        // v1.0.88 (BUG-6 fix, ADR-0046): propagate the structured
345        // `PreFlightError` via the `From` impl in `errors.rs` so callers
346        // receive `AppError::PreFlightFailed` (exit 16) instead of a
347        // bare `std::process::exit(16)` that discards the variant name,
348        // tracing context, and PT-BR i18n.
349        return Err(crate::errors::AppError::from(e));
350    }
351
352    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
353        AppError::Io(std::io::Error::new(
354            e.kind(),
355            format!("failed to spawn claude: {e}"),
356        ))
357    })?;
358
359    let stdin_data = file_content.to_vec();
360    let mut child_stdin = child
361        .stdin
362        .take()
363        .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
364    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
365        child_stdin.write_all(&stdin_data)?;
366        drop(child_stdin);
367        Ok(())
368    });
369
370    let start = std::time::Instant::now();
371    let timeout = std::time::Duration::from_secs(timeout_secs);
372    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
373
374    match status {
375        Some(exit_status) => {
376            stdin_thread
377                .join()
378                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
379                .map_err(AppError::Io)?;
380
381            tracing::debug!(
382                target: "process",
383                exit_code = ?exit_status.code(),
384                elapsed_ms = start.elapsed().as_millis() as u64,
385                "external process completed"
386            );
387
388            let mut stdout_buf = Vec::new();
389            let mut stderr_buf = Vec::new();
390            if let Some(mut out) = child.stdout.take() {
391                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
392            }
393            if let Some(mut err) = child.stderr.take() {
394                std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
395            }
396
397            if !exit_status.success() {
398                let stdout_str = String::from_utf8_lossy(&stdout_buf);
399                if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
400                {
401                    if let Some(re) = elements
402                        .iter()
403                        .find(|e| e.r#type.as_deref() == Some("result"))
404                    {
405                        if re.terminal_reason.as_deref() == Some("max_turns") {
406                            tracing::warn!(
407                                target: "ingest",
408                                "extraction hit max_turns limit — hooks may have consumed turns"
409                            );
410                            return Err(AppError::Validation(
411                                "claude -p hit max_turns: hooks may be consuming turns".into(),
412                            ));
413                        }
414                        if re.is_error {
415                            let err_msg = re
416                                .error
417                                .as_deref()
418                                .or(re.result.as_deref())
419                                .unwrap_or("unknown error");
420                            if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
421                                return Err(AppError::RateLimited {
422                                    detail: err_msg.to_string(),
423                                });
424                            }
425                            if err_msg.contains("Not logged in")
426                                || err_msg.contains("authentication")
427                            {
428                                tracing::warn!(
429                                    target: "ingest",
430                                    "Claude Code authentication failed. Re-authenticate interactively with: claude"
431                                );
432                            }
433                            return Err(AppError::Validation(format!(
434                                "claude -p failed: {err_msg}"
435                            )));
436                        }
437                    }
438                }
439                let stderr_str = String::from_utf8_lossy(&stderr_buf);
440                if stderr_str.contains("auth") || stderr_str.contains("login") {
441                    tracing::warn!(
442                        target: "ingest",
443                        "Claude Code authentication may have failed. Re-authenticate with: claude"
444                    );
445                }
446                return Err(AppError::Validation(format!(
447                    "claude -p exited with code {:?}: {}",
448                    exit_status.code(),
449                    stderr_str.trim()
450                )));
451            }
452
453            let stdout = String::from_utf8(stdout_buf)
454                .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
455            parse_claude_output(&stdout)
456        }
457        None => {
458            tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
459            let _ = child.kill();
460            let _ = child.wait();
461            let _ = stdin_thread.join();
462            Err(AppError::Validation(format!(
463                "claude -p timed out after {timeout_secs} seconds"
464            )))
465        }
466    }
467}
468
469/// Parses the JSON array output from `claude -p --output-format json`.
470///
471/// Returns `(extraction, cost_usd, is_oauth)` where `is_oauth` is true when
472/// the init element reports `apiKeySource: "none"` (OAuth subscription).
473/// Parses `claude -p --output-format json` output into the ingest
474/// `ExtractionResult`. Delegates the shared element/error/oauth handling to
475/// `claude_runner::parse_claude_output` (DRY v1.0.97), which additionally
476/// covers G03 max_turns detection and the auth-failure warn, then deserialises
477/// the structured value into the typed `ExtractionResult`.
478fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64, bool), AppError> {
479    let result = super::claude_runner::parse_claude_output_opts(stdout, true)?;
480    let extraction: ExtractionResult = serde_json::from_value(result.value).map_err(|e| {
481        AppError::Validation(format!(
482            "failed to deserialize claude output as ExtractionResult: {e}"
483        ))
484    })?;
485    Ok((extraction, result.cost_usd, result.is_oauth))
486}
487
488use crate::output::emit_json_line as emit_json;
489
490/// Collects files matching the pattern (reuses ingest logic).
491fn collect_matching_files(
492    dir: &Path,
493    pattern: &str,
494    recursive: bool,
495    max_files: usize,
496) -> Result<Vec<PathBuf>, AppError> {
497    let mut files = Vec::new();
498    super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
499    files.sort_unstable();
500
501    if files.len() > max_files {
502        return Err(AppError::Validation(format!(
503            "found {} files, exceeds --max-files cap of {}",
504            files.len(),
505            max_files
506        )));
507    }
508
509    Ok(files)
510}
511
512/// Opens or creates the queue database for tracking ingest progress.
513fn open_queue_db<P: AsRef<std::path::Path>>(path: P) -> Result<Connection, AppError> {
514    let conn = Connection::open(path)?;
515
516    conn.pragma_update(None, "journal_mode", "wal")?;
517
518    conn.execute_batch(
519        "CREATE TABLE IF NOT EXISTS queue (
520            id          INTEGER PRIMARY KEY AUTOINCREMENT,
521            file_path   TEXT NOT NULL UNIQUE,
522            name        TEXT,
523            status      TEXT NOT NULL DEFAULT 'pending',
524            memory_id   INTEGER,
525            entities    INTEGER DEFAULT 0,
526            rels        INTEGER DEFAULT 0,
527            error       TEXT,
528            cost_usd    REAL DEFAULT 0.0,
529            attempt     INTEGER DEFAULT 0,
530            elapsed_ms  INTEGER,
531            created_at  TEXT DEFAULT (datetime('now')),
532            done_at     TEXT
533        );
534        CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
535    )?;
536
537    Ok(conn)
538}
539
540/// Main entry point for `ingest --mode claude-code`.
541pub fn run_claude_ingest(
542    args: &IngestArgs,
543    embedding_backend: crate::cli::EmbeddingBackendChoice,
544    llm_backend: crate::cli::LlmBackendChoice,
545) -> Result<(), AppError> {
546    let started = Instant::now();
547
548    if !args.dir.exists() {
549        return Err(AppError::Validation(format!(
550            "directory not found: {}",
551            args.dir.display()
552        )));
553    }
554
555    // G28-B (v1.0.68) + G30 (v1.0.69): acquire singleton before doing real
556    // work so two parallel `ingest --mode claude-code` invocations cannot
557    // co-exist on the same database. Scope includes the database hash so
558    // concurrent ingest against different databases is allowed.
559    let early_ns = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
560    let early_paths = AppPaths::resolve(args.db.as_deref())?;
561    let queue_path = match args.queue_db.as_deref() {
562        Some(p) => std::path::PathBuf::from(p),
563        None => crate::paths::sidecar_path(&early_paths.db, ".ingest-queue.sqlite"),
564    };
565    let _singleton = crate::lock::acquire_job_singleton(
566        crate::lock::JobType::IngestClaudeCode,
567        &early_ns,
568        &early_paths.db,
569        args.wait_job_singleton,
570        args.force_job_singleton,
571    )?;
572
573    // Stage 1: Validate
574    let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
575    let version = validate_claude_version(&claude_binary)?;
576    tracing::info!(
577        target: "ingest",
578        binary = %claude_binary.display(),
579        version = %version,
580        "Claude Code binary validated"
581    );
582
583    emit_json(&PhaseEvent {
584        phase: "validate",
585        claude_path: claude_binary.to_str(),
586        version: Some(&version),
587        dir: None,
588        files_total: None,
589        files_new: None,
590        files_existing: None,
591    });
592
593    // Stage 2: Scan
594    let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
595
596    let queue_conn = open_queue_db(&queue_path)?;
597
598    if args.resume {
599        let reset = queue_conn
600            .execute(
601                "UPDATE queue SET status='pending' WHERE status='processing'",
602                [],
603            )
604            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
605        if reset > 0 {
606            tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
607        }
608    }
609
610    if args.retry_failed {
611        let count = queue_conn
612            .execute(
613                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
614                [],
615            )
616            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
617        tracing::info!(target: "ingest", count, "retrying failed files");
618    }
619
620    if !args.resume && !args.retry_failed {
621        queue_conn
622            .execute("DELETE FROM queue", [])
623            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
624    }
625
626    let mut new_count = 0usize;
627    let mut existing_count = 0usize;
628
629    if !args.retry_failed {
630        for file in &files {
631            let file_str = file.to_string_lossy().into_owned();
632            let inserted = queue_conn
633                .execute(
634                    "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
635                    rusqlite::params![file_str],
636                )
637                .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
638            if inserted > 0 {
639                new_count += 1;
640            } else {
641                existing_count += 1;
642            }
643        }
644    }
645
646    emit_json(&PhaseEvent {
647        phase: "scan",
648        claude_path: None,
649        version: None,
650        dir: args.dir.to_str(),
651        files_total: Some(files.len()),
652        files_new: Some(new_count),
653        files_existing: Some(existing_count),
654    });
655
656    if args.dry_run {
657        for (idx, file) in files.iter().enumerate() {
658            let (name, _truncated, _orig) =
659                super::ingest::derive_kebab_name(file, args.max_name_length);
660            emit_json(&FileEvent {
661                file: &file.to_string_lossy(),
662                name: &name,
663                status: "preview",
664                memory_id: None,
665                entities: None,
666                rels: None,
667                cost_usd: None,
668                elapsed_ms: None,
669                error: None,
670                index: idx,
671                total: files.len(),
672            });
673        }
674        emit_json(&Summary {
675            summary: true,
676            files_total: files.len(),
677            completed: 0,
678            failed: 0,
679            skipped: 0,
680            entities_total: 0,
681            rels_total: 0,
682            cost_usd: 0.0,
683            elapsed_ms: started.elapsed().as_millis() as u64,
684        });
685        if !args.keep_queue {
686            let _ = std::fs::remove_file(&queue_path);
687        }
688        return Ok(());
689    }
690
691    // Stage 3: Process
692    let paths = AppPaths::resolve(args.db.as_deref())?;
693    ensure_db_ready(&paths)?;
694    let conn = open_rw(&paths.db)?;
695
696    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
697    let memory_type_str = args.r#type.as_str().to_string();
698
699    let mut completed = 0usize;
700    let mut failed = 0usize;
701    let skipped_initial: usize = queue_conn
702        .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
703            r.get::<_, usize>(0)
704        })
705        .unwrap_or(0);
706    let mut skipped = skipped_initial;
707    let mut entities_total = 0usize;
708    let mut rels_total = 0usize;
709    let mut cost_total = 0.0f64;
710    let mut oauth_detected = false;
711    let total = files.len();
712
713    let mut backoff_secs = args.rate_limit_wait;
714    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
715
716    loop {
717        if crate::shutdown_requested() {
718            tracing::info!(target: "ingest", "shutdown requested, stopping before next file");
719            break;
720        }
721
722        let pending: Option<(i64, String)> = queue_conn
723            .query_row(
724                "UPDATE queue SET status='processing', attempt=attempt+1 \
725                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
726                 RETURNING id, file_path",
727                [],
728                |row| Ok((row.get(0)?, row.get(1)?)),
729            )
730            .ok();
731
732        let (queue_id, file_path) = match pending {
733            Some(p) => p,
734            None => break,
735        };
736
737        let file_started = Instant::now();
738
739        // G05: reject files that exceed the 10 MB stdin limit
740        const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
741        if let Ok(meta) = std::fs::metadata(&file_path) {
742            if meta.len() > MAX_FILE_SIZE {
743                let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
744                let _ = queue_conn.execute(
745                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
746                    rusqlite::params![err_msg, queue_id],
747                );
748                let current_index = completed + failed + skipped;
749                failed += 1;
750                emit_json(&FileEvent {
751                    file: &file_path,
752                    name: "",
753                    status: "failed",
754                    memory_id: None,
755                    entities: None,
756                    rels: None,
757                    cost_usd: None,
758                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
759                    error: Some(&err_msg),
760                    index: current_index,
761                    total,
762                });
763                if args.fail_fast {
764                    break;
765                }
766                continue;
767            }
768        }
769
770        let file_content = match std::fs::read(&file_path) {
771            Ok(c) => c,
772            Err(e) => {
773                let err_msg = format!("IO error: {e}");
774                let _ = queue_conn.execute(
775                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
776                    rusqlite::params![err_msg, queue_id],
777                );
778                let current_index = completed + failed + skipped;
779                failed += 1;
780                emit_json(&FileEvent {
781                    file: &file_path,
782                    name: "",
783                    status: "failed",
784                    memory_id: None,
785                    entities: None,
786                    rels: None,
787                    cost_usd: None,
788                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
789                    error: Some(&err_msg),
790                    index: current_index,
791                    total,
792                });
793                if args.fail_fast {
794                    break;
795                }
796                continue;
797            }
798        };
799
800        // B08: skip files exceeding body cap BEFORE sending to LLM to avoid wasting tokens
801        if file_content.len() > crate::constants::MAX_MEMORY_BODY_LEN {
802            let err_msg = format!(
803                "file body exceeds {} byte limit ({} bytes) — skipping to avoid wasting LLM tokens",
804                crate::constants::MAX_MEMORY_BODY_LEN,
805                file_content.len()
806            );
807            tracing::warn!(target: "ingest", file = %file_path, size = file_content.len(), "body exceeds limit, skipping LLM extraction");
808            let _ = queue_conn.execute(
809                "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
810                rusqlite::params![err_msg, queue_id],
811            );
812            let current_index = completed + failed + skipped;
813            skipped += 1;
814            emit_json(&FileEvent {
815                file: &file_path,
816                name: "",
817                status: "skipped",
818                memory_id: None,
819                entities: None,
820                rels: None,
821                cost_usd: None,
822                elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
823                error: Some(&err_msg),
824                index: current_index,
825                total,
826            });
827            continue;
828        }
829
830        // B07: retry once on cold-start failure (Claude Code Issue #23265)
831        let max_extract_attempts: u32 = 2;
832        let mut extraction_result: Option<(ExtractionResult, f64, bool)> = None;
833        let mut last_extract_err: Option<String> = None;
834        let mut last_was_rate_limited = false;
835
836        for attempt in 1..=max_extract_attempts {
837            match extract_with_claude(
838                &claude_binary,
839                &file_content,
840                args.claude_model.as_deref(),
841                args.claude_timeout,
842            ) {
843                Ok(result) => {
844                    extraction_result = Some(result);
845                    break;
846                }
847                Err(ref e) if matches!(e, AppError::RateLimited { .. }) => {
848                    last_extract_err = Some(format!("{e}"));
849                    last_was_rate_limited = true;
850                    break;
851                }
852                Err(e) => {
853                    let msg = format!("{e}");
854                    if attempt < max_extract_attempts {
855                        let cold_start_delay = 2 * attempt as u64;
856                        tracing::warn!(target: "ingest", attempt, delay_secs = cold_start_delay, error = %msg, "extraction failed, retrying (cold-start workaround)");
857                        std::thread::sleep(std::time::Duration::from_secs(cold_start_delay));
858                    }
859                    last_extract_err = Some(msg);
860                }
861            }
862        }
863
864        if let Some((extraction, cost, is_oauth)) = extraction_result {
865            if is_oauth && !oauth_detected {
866                oauth_detected = true;
867                tracing::info!(target: "ingest", "OAuth subscription detected — cost_usd omitted from output");
868            }
869            backoff_secs = args.rate_limit_wait;
870
871            let (normalized_name, _truncated, _orig) = crate::commands::ingest::derive_kebab_name(
872                std::path::Path::new(&extraction.name),
873                args.max_name_length,
874            );
875            let name = &normalized_name;
876            let ent_count = extraction.entities.len();
877            let rel_count = 0;
878
879            // GAP-SG-47: fold non-canonical labels onto the nearest canonical
880            // kind instead of discarding the entity (no silent data loss).
881            let new_entities: Vec<NewEntity> = extraction
882                .entities
883                .iter()
884                .map(|e| NewEntity {
885                    name: e.name.clone(),
886                    entity_type: EntityType::map_to_canonical(&e.entity_type),
887                    description: None,
888                })
889                .collect();
890
891            // GAP-SG-48: rewrite non-canonical relations to canonical instead
892            // of normalizing-and-accepting them raw.
893            let new_relationships: Vec<NewRelationship> = extraction
894                .relationships
895                .iter()
896                .map(|r| NewRelationship {
897                    source: r.source.clone(),
898                    target: r.target.clone(),
899                    relation: crate::parsers::map_to_canonical_relation(&r.relation),
900                    strength: r.strength,
901                    description: None,
902                })
903                .collect();
904
905            let body_str = String::from_utf8(file_content.clone())
906                .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
907            let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
908            let new_memory = NewMemory {
909                name: name.clone(),
910                namespace: namespace.clone(),
911                memory_type: memory_type_str.clone(),
912                description: extraction.description.clone(),
913                body: body_str.to_string(),
914                body_hash,
915                session_id: None,
916                source: "agent".to_string(),
917                metadata: serde_json::Value::Object(serde_json::Map::new()),
918            };
919
920            // B06: deduplication — update existing memory instead of failing on UNIQUE
921            let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
922                Some((existing_id, is_deleted)) => {
923                    if is_deleted {
924                        memories::clear_deleted_at(&conn, existing_id)?;
925                    }
926                    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
927                        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
928                        rusqlite::params![existing_id],
929                        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
930                    )?;
931                    memories::update(&conn, existing_id, &new_memory, None)?;
932                    memories::sync_fts_after_update(
933                        &conn,
934                        existing_id,
935                        &old_name,
936                        &old_desc,
937                        &old_body,
938                        &new_memory.name,
939                        &new_memory.description,
940                        &new_memory.body,
941                    )?;
942                    tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
943                    existing_id
944                }
945                None => match memories::insert(&conn, &new_memory) {
946                    Ok(id) => id,
947                    Err(e) => {
948                        let err_msg = format!("{e}");
949                        let _ = queue_conn.execute(
950                                "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
951                                rusqlite::params![err_msg, queue_id],
952                            );
953                        let current_index = completed + failed + skipped;
954                        failed += 1;
955                        emit_json(&FileEvent {
956                            file: &file_path,
957                            name,
958                            status: "failed",
959                            memory_id: None,
960                            entities: None,
961                            rels: None,
962                            cost_usd: if is_oauth { None } else { Some(cost) },
963                            elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
964                            error: Some(&err_msg),
965                            index: current_index,
966                            total,
967                        });
968                        if !is_oauth {
969                            cost_total += cost;
970                        }
971                        if args.fail_fast {
972                            break;
973                        }
974                        continue;
975                    }
976                },
977            };
978
979            for ent in &new_entities {
980                match entities::upsert_entity(&conn, &namespace, ent) {
981                    Ok(eid) => {
982                        let _ = entities::link_memory_entity(&conn, memory_id, eid);
983                    }
984                    Err(e) => {
985                        tracing::warn!(
986                            target: "ingest",
987                            entity = %ent.name,
988                            error = %e,
989                            "entity skipped due to validation error"
990                        );
991                    }
992                }
993            }
994            for rel in &new_relationships {
995                crate::parsers::warn_if_non_canonical(&rel.relation);
996                let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
997                let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
998                if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
999                    let _ = conn.execute(
1000                        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
1001                        rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
1002                    );
1003                }
1004            }
1005
1006            // G01: embedding pipeline — enables recall to find memories created via --mode claude-code
1007            let body_text = String::from_utf8(file_content.clone())
1008                .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
1009            let snippet: String = body_text.chars().take(200).collect();
1010            let chunks_info = crate::chunking::split_into_chunks_hierarchical(&body_text);
1011
1012            // v1.0.89 (GAP-EMBED-PROPAGATION): honour --llm-backend via embed_passage_with_choice.
1013            let embedding_result = if chunks_info.len() <= 1 {
1014                crate::embedder::embed_passage_with_embedding_choice(
1015                    &paths.models,
1016                    &body_text,
1017                    embedding_backend,
1018                    llm_backend,
1019                )
1020                .map(|(v, _)| v)
1021            } else {
1022                let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
1023                let mut multi_ok = true;
1024                for chunk in &chunks_info {
1025                    let chunk_text = crate::chunking::chunk_text(&body_text, chunk);
1026                    match crate::embedder::embed_passage_with_embedding_choice(
1027                        &paths.models,
1028                        chunk_text,
1029                        embedding_backend,
1030                        llm_backend,
1031                    )
1032                    .map(|(v, _)| v)
1033                    {
1034                        Ok(emb) => chunk_embeddings.push(emb),
1035                        Err(e) => {
1036                            tracing::warn!(
1037                                target: "ingest",
1038                                file = %file_path,
1039                                error = %e,
1040                                "chunk embedding failed, skipping vector index for this file"
1041                            );
1042                            multi_ok = false;
1043                            break;
1044                        }
1045                    }
1046                }
1047                if multi_ok {
1048                    let aggregated = crate::chunking::aggregate_embeddings(&chunk_embeddings);
1049                    // persist per-chunk vectors
1050                    if let Err(e) = crate::storage::chunks::insert_chunk_slices(
1051                        &conn,
1052                        memory_id,
1053                        &body_text,
1054                        &chunks_info,
1055                    ) {
1056                        tracing::warn!(
1057                            target: "ingest",
1058                            file = %file_path,
1059                            error = %e,
1060                            "chunk slice insert failed"
1061                        );
1062                    } else {
1063                        for (i, emb) in chunk_embeddings.iter().enumerate() {
1064                            if let Err(e) = crate::storage::chunks::upsert_chunk_vec(
1065                                &conn, i as i64, memory_id, i as i32, emb,
1066                            ) {
1067                                tracing::warn!(
1068                                    target: "ingest",
1069                                    file = %file_path,
1070                                    chunk = i,
1071                                    error = %e,
1072                                    "chunk vec upsert failed"
1073                                );
1074                            }
1075                        }
1076                    }
1077                    Ok(aggregated)
1078                } else {
1079                    // fallback: embed whole body for the memory-level vector
1080                    crate::embedder::embed_passage_with_embedding_choice(
1081                        &paths.models,
1082                        &body_text,
1083                        embedding_backend,
1084                        llm_backend,
1085                    )
1086                    .map(|(v, _)| v)
1087                }
1088            };
1089
1090            match embedding_result {
1091                Ok(embedding) => {
1092                    if let Err(e) = memories::upsert_vec(
1093                        &conn,
1094                        memory_id,
1095                        &namespace,
1096                        &memory_type_str,
1097                        &embedding,
1098                        name,
1099                        &snippet,
1100                    ) {
1101                        tracing::warn!(
1102                            target: "ingest",
1103                            file = %file_path,
1104                            error = %e,
1105                            "memory vec upsert failed; recall may not find this memory"
1106                        );
1107                    }
1108                    // embed each entity that was successfully upserted
1109                    for ent in &new_entities {
1110                        if let Ok(Some(eid)) =
1111                            entities::find_entity_id(&conn, &namespace, &ent.name)
1112                        {
1113                            let entity_text = ent.name.clone();
1114                            match crate::embedder::embed_passage_with_embedding_choice(
1115                                &paths.models,
1116                                &entity_text,
1117                                embedding_backend,
1118                                llm_backend,
1119                            )
1120                            .map(|(v, _)| v)
1121                            {
1122                                Ok(emb) => {
1123                                    if let Err(e) = entities::upsert_entity_vec(
1124                                        &conn,
1125                                        eid,
1126                                        &namespace,
1127                                        ent.entity_type,
1128                                        &emb,
1129                                        &ent.name,
1130                                    ) {
1131                                        tracing::warn!(
1132                                            target: "ingest",
1133                                            entity = %ent.name,
1134                                            error = %e,
1135                                            "entity vec upsert failed"
1136                                        );
1137                                    }
1138                                }
1139                                Err(e) => {
1140                                    tracing::warn!(
1141                                        target: "ingest",
1142                                        entity = %ent.name,
1143                                        error = %e,
1144                                        "entity embedding failed"
1145                                    );
1146                                }
1147                            }
1148                        }
1149                    }
1150                }
1151                Err(e) => {
1152                    tracing::warn!(
1153                        target: "ingest",
1154                        file = %file_path,
1155                        error = %e,
1156                        "memory embedding failed; recall will not find this memory"
1157                    );
1158                }
1159            }
1160
1161            let _ = queue_conn.execute(
1162                "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1163                rusqlite::params![
1164                    name,
1165                    memory_id,
1166                    ent_count,
1167                    rel_count,
1168                    cost,
1169                    file_started.elapsed().as_millis() as i64,
1170                    queue_id
1171                ],
1172            );
1173
1174            let current_index = completed + failed + skipped;
1175            completed += 1;
1176            entities_total += ent_count;
1177            rels_total += rel_count;
1178            if !is_oauth {
1179                cost_total += cost;
1180            }
1181
1182            emit_json(&FileEvent {
1183                file: &file_path,
1184                name,
1185                status: "done",
1186                memory_id: Some(memory_id),
1187                entities: Some(ent_count),
1188                rels: Some(rel_count),
1189                cost_usd: if is_oauth { None } else { Some(cost) },
1190                elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1191                error: None,
1192                index: current_index,
1193                total,
1194            });
1195        } else if let Some(ref err_str) = last_extract_err {
1196            if last_was_rate_limited {
1197                if crate::retry::is_kill_switch_active() {
1198                    tracing::warn!(target: "ingest", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1199                } else if std::time::Instant::now() >= rate_limit_deadline {
1200                    tracing::error!(target: "ingest", "rate-limit retry deadline (1h) exhausted");
1201                } else {
1202                    let half = backoff_secs / 2;
1203                    let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1204                    let actual_wait = half + jitter;
1205                    tracing::warn!(target: "ingest", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
1206                    let _ = queue_conn.execute(
1207                        "UPDATE queue SET status='pending' WHERE id=?1",
1208                        rusqlite::params![queue_id],
1209                    );
1210                    std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1211                    backoff_secs = (backoff_secs * 2).min(900);
1212                    continue;
1213                }
1214            } else {
1215                let _ = queue_conn.execute(
1216                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1217                    rusqlite::params![err_str, queue_id],
1218                );
1219                let current_index = completed + failed + skipped;
1220                failed += 1;
1221                emit_json(&FileEvent {
1222                    file: &file_path,
1223                    name: "",
1224                    status: "failed",
1225                    memory_id: None,
1226                    entities: None,
1227                    rels: None,
1228                    cost_usd: None,
1229                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1230                    error: Some(err_str),
1231                    index: current_index,
1232                    total,
1233                });
1234                if args.fail_fast {
1235                    break;
1236                }
1237            }
1238        }
1239
1240        if let Some(budget) = args.max_cost_usd {
1241            if oauth_detected {
1242                tracing::debug!(target: "ingest", "--max-cost-usd ignored: OAuth subscription detected");
1243            } else if cost_total >= budget {
1244                tracing::warn!(
1245                    target: "ingest",
1246                    spent = cost_total,
1247                    budget = budget,
1248                    "budget exceeded, stopping"
1249                );
1250                break;
1251            }
1252        }
1253    }
1254
1255    // Stage 4: Summary
1256    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1257    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1258
1259    emit_json(&Summary {
1260        summary: true,
1261        files_total: total,
1262        completed,
1263        failed,
1264        skipped,
1265        entities_total,
1266        rels_total,
1267        cost_usd: cost_total,
1268        elapsed_ms: started.elapsed().as_millis() as u64,
1269    });
1270
1271    if !args.keep_queue && failed == 0 {
1272        let _ = std::fs::remove_file(&queue_path);
1273    }
1274
1275    Ok(())
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280    use super::*;
1281
1282    #[test]
1283    fn test_extraction_schema_valid_json() {
1284        let _: serde_json::Value =
1285            serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
1286    }
1287
1288    #[test]
1289    fn test_parse_claude_output_valid() {
1290        let output = r#"[
1291            {"type":"system","subtype":"init"},
1292            {"type":"assistant"},
1293            {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
1294        ]"#;
1295        let (result, cost, _is_oauth) = parse_claude_output(output).expect("parse must succeed");
1296        assert_eq!(result.name, "test-doc");
1297        assert_eq!(result.entities.len(), 1);
1298        assert_eq!(result.relationships.len(), 1);
1299        assert!((cost - 0.02).abs() < f64::EPSILON);
1300    }
1301
1302    #[test]
1303    fn test_parse_claude_output_error() {
1304        let output = r#"[
1305            {"type":"system","subtype":"init"},
1306            {"type":"result","is_error":true,"error":"authentication failed"}
1307        ]"#;
1308        let err = parse_claude_output(output).unwrap_err();
1309        assert!(format!("{err}").contains("authentication failed"));
1310    }
1311
1312    #[test]
1313    fn test_parse_claude_output_rate_limit() {
1314        let output = r#"[
1315            {"type":"system","subtype":"init"},
1316            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
1317        ]"#;
1318        let err = parse_claude_output(output).unwrap_err();
1319        assert!(matches!(err, AppError::RateLimited { .. }));
1320    }
1321
1322    #[test]
1323    fn test_parse_claude_output_malformed() {
1324        let output = "not json at all";
1325        assert!(parse_claude_output(output).is_err());
1326    }
1327
1328    #[test]
1329    fn test_find_claude_binary_not_found() {
1330        let original_path = std::env::var_os("PATH");
1331        std::env::set_var("PATH", "/nonexistent");
1332        std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
1333        let result = find_claude_binary(None);
1334        if let Some(p) = original_path {
1335            std::env::set_var("PATH", p);
1336        }
1337        assert!(result.is_err());
1338    }
1339
1340    #[test]
1341    fn test_parse_claude_output_result_fallback() {
1342        let output = r#"[
1343            {"type":"system","subtype":"init"},
1344            {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1345        ]"#;
1346        let (result, cost, _is_oauth) =
1347            parse_claude_output(output).expect("result fallback must work");
1348        assert_eq!(result.name, "test-fallback");
1349        assert_eq!(result.entities.len(), 1);
1350        assert!(result.relationships.is_empty());
1351        assert!((cost - 0.01).abs() < f64::EPSILON);
1352    }
1353
1354    #[test]
1355    fn test_parse_claude_output_error_with_result_field() {
1356        let output = r#"[
1357            {"type":"system","subtype":"init"},
1358            {"type":"result","is_error":true,"result":"Not logged in · Please run /login"}
1359        ]"#;
1360        let err = parse_claude_output(output).unwrap_err();
1361        let msg = format!("{err}");
1362        assert!(
1363            msg.contains("Not logged in"),
1364            "expected 'Not logged in' in: {msg}"
1365        );
1366    }
1367
1368    #[test]
1369    fn test_terminal_reason_max_turns_detected() {
1370        let output = r#"[
1371            {"type":"system","subtype":"init"},
1372            {"type":"result","is_error":false,"terminal_reason":"max_turns","structured_output":{"name":"t","description":"d","entities":[],"relationships":[]}}
1373        ]"#;
1374        let err_or_ok = parse_claude_output(output);
1375        assert!(
1376            err_or_ok.is_ok(),
1377            "max_turns in result without is_error should still parse"
1378        );
1379    }
1380
1381    #[test]
1382    fn test_detect_oauth_from_init_json() {
1383        let output = r#"[
1384            {"type":"system","subtype":"init","apiKeySource":"none"},
1385            {"type":"result","is_error":false,"total_cost_usd":0.50,"structured_output":{"name":"test-oauth","description":"oauth test","entities":[],"relationships":[]}}
1386        ]"#;
1387        let (_result, cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1388        assert!(is_oauth, "apiKeySource=none must be detected as OAuth");
1389        assert!((cost - 0.50).abs() < f64::EPSILON);
1390    }
1391
1392    #[test]
1393    fn test_api_key_source_not_oauth() {
1394        let output = r#"[
1395            {"type":"system","subtype":"init","apiKeySource":"env"},
1396            {"type":"result","is_error":false,"total_cost_usd":0.10,"structured_output":{"name":"test-api","description":"api test","entities":[],"relationships":[]}}
1397        ]"#;
1398        let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1399        assert!(!is_oauth, "apiKeySource=env must NOT be detected as OAuth");
1400    }
1401
1402    #[test]
1403    fn test_missing_api_key_source_defaults_not_oauth() {
1404        let output = r#"[
1405            {"type":"system","subtype":"init"},
1406            {"type":"result","is_error":false,"total_cost_usd":0.05,"structured_output":{"name":"test-missing","description":"missing test","entities":[],"relationships":[]}}
1407        ]"#;
1408        let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1409        assert!(!is_oauth, "missing apiKeySource must default to not OAuth");
1410    }
1411
1412    #[test]
1413    fn test_extraction_schema_entity_types_match_enum() {
1414        let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1415        let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1416            .as_array()
1417            .expect("schema must have entity_type enum");
1418        for t in types {
1419            let s = t.as_str().unwrap();
1420            assert!(
1421                s.parse::<EntityType>().is_ok(),
1422                "schema entity_type '{s}' not in EntityType enum"
1423            );
1424        }
1425    }
1426}