Skip to main content

sqlite_graphrag/commands/
ingest_claude.rs

1//! Handler for `ingest --mode claude-code`.
2//!
3//! Orchestrates the locally installed Claude Code CLI binary (`claude -p`)
4//! to extract domain-specific entities and relationships from each file,
5//! then persists them via the same pipeline as `remember --graph-stdin`.
6//!
7//! Architecture: P1 One-Shot per file — each file spawns a separate
8//! `claude -p` process with `--json-schema` for guaranteed structured output.
9//! A SQLite queue DB tracks progress for resume/retry support.
10// Workload: Subprocess I/O-bound (claude -p headless with network wait)
11
12use crate::commands::ingest::IngestArgs;
13use crate::entity_type::EntityType;
14use crate::errors::AppError;
15use crate::paths::AppPaths;
16use crate::storage::connection::{ensure_db_ready, open_rw};
17use crate::storage::entities::{self, NewEntity, NewRelationship};
18use crate::storage::memories::{self, NewMemory};
19
20use rusqlite::Connection;
21use serde::{Deserialize, Serialize};
22use std::io::Write;
23use std::path::{Path, PathBuf};
24use std::process::{Command, Stdio};
25use std::time::Instant;
26
27const MIN_CLAUDE_VERSION: &str = "2.1.0";
28
29const EXTRACTION_SCHEMA: &str = r#"{
30  "type": "object",
31  "properties": {
32    "name": { "type": "string" },
33    "description": { "type": "string" },
34    "entities": {
35      "type": "array",
36      "items": {
37        "type": "object",
38        "properties": {
39          "name": { "type": "string" },
40          "entity_type": {
41            "type": "string",
42            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
43          }
44        },
45        "required": ["name", "entity_type"],
46        "additionalProperties": false
47      }
48    },
49    "relationships": {
50      "type": "array",
51      "items": {
52        "type": "object",
53        "properties": {
54          "source": { "type": "string" },
55          "target": { "type": "string" },
56          "relation": {
57            "type": "string",
58            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
59          },
60          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
61        },
62        "required": ["source","target","relation","strength"],
63        "additionalProperties": false
64      }
65    }
66  },
67  "required": ["name","description","entities","relationships"],
68  "additionalProperties": false
69}"#;
70
71const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
721. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
732. A one-sentence description (10-20 words) summarizing the key insight\n\
743. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
754. Typed relationships between entities with strength scores\n\n\
76Rules:\n\
77- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
78- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
79- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
80- NEVER use 'mentions' as relationship type\n\
81- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
82- Prefer fewer high-quality entities over many low-quality ones\n\
83- Description must answer: What is this about and WHY does it matter?";
84
85#[derive(Debug, Deserialize)]
86struct ClaudeOutputElement {
87    r#type: Option<String>,
88    subtype: Option<String>,
89    #[serde(default)]
90    is_error: bool,
91    structured_output: Option<ExtractionResult>,
92    result: Option<String>,
93    total_cost_usd: Option<f64>,
94    error: Option<String>,
95    terminal_reason: Option<String>,
96    #[serde(rename = "apiKeySource")]
97    api_key_source: Option<String>,
98}
99
100#[derive(Debug, Clone, Deserialize, Serialize)]
101pub struct ExtractionResult {
102    pub name: String,
103    pub description: String,
104    pub entities: Vec<ExtractedEntity>,
105    pub relationships: Vec<ExtractedRelationship>,
106}
107
108#[derive(Debug, Clone, Deserialize, Serialize)]
109pub struct ExtractedEntity {
110    pub name: String,
111    pub entity_type: String,
112}
113
114#[derive(Debug, Clone, Deserialize, Serialize)]
115pub struct ExtractedRelationship {
116    pub source: String,
117    pub target: String,
118    pub relation: String,
119    pub strength: f64,
120}
121
122#[derive(Debug, Serialize)]
123struct PhaseEvent<'a> {
124    phase: &'a str,
125    #[serde(skip_serializing_if = "Option::is_none")]
126    claude_path: Option<&'a str>,
127    #[serde(skip_serializing_if = "Option::is_none")]
128    version: Option<&'a str>,
129    #[serde(skip_serializing_if = "Option::is_none")]
130    dir: Option<&'a str>,
131    #[serde(skip_serializing_if = "Option::is_none")]
132    files_total: Option<usize>,
133    #[serde(skip_serializing_if = "Option::is_none")]
134    files_new: Option<usize>,
135    #[serde(skip_serializing_if = "Option::is_none")]
136    files_existing: Option<usize>,
137}
138
139#[derive(Debug, Serialize)]
140struct FileEvent<'a> {
141    file: &'a str,
142    name: &'a str,
143    status: &'a str,
144    #[serde(skip_serializing_if = "Option::is_none")]
145    memory_id: Option<i64>,
146    #[serde(skip_serializing_if = "Option::is_none")]
147    entities: Option<usize>,
148    #[serde(skip_serializing_if = "Option::is_none")]
149    rels: Option<usize>,
150    #[serde(skip_serializing_if = "Option::is_none")]
151    cost_usd: Option<f64>,
152    #[serde(skip_serializing_if = "Option::is_none")]
153    elapsed_ms: Option<u64>,
154    #[serde(skip_serializing_if = "Option::is_none")]
155    error: Option<&'a str>,
156    index: usize,
157    total: usize,
158}
159
160#[derive(Debug, Serialize)]
161struct Summary {
162    summary: bool,
163    files_total: usize,
164    completed: usize,
165    failed: usize,
166    skipped: usize,
167    entities_total: usize,
168    rels_total: usize,
169    cost_usd: f64,
170    elapsed_ms: u64,
171}
172
173/// Locates the Claude Code binary on the system.
174pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
175    if let Some(p) = explicit {
176        if p.exists() {
177            return Ok(p.to_path_buf());
178        }
179        return Err(AppError::Validation(format!(
180            "Claude Code binary not found at explicit path: {}",
181            p.display()
182        )));
183    }
184
185    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
186        let p = PathBuf::from(&env_path);
187        if p.exists() {
188            return Ok(p);
189        }
190    }
191
192    let name = if cfg!(windows) {
193        "claude.exe"
194    } else {
195        "claude"
196    };
197    if let Some(path_var) = std::env::var_os("PATH") {
198        for dir in std::env::split_paths(&path_var) {
199            let candidate = dir.join(name);
200            if candidate.exists() {
201                return Ok(candidate);
202            }
203        }
204    }
205
206    Err(AppError::Validation(
207        "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
208    ))
209}
210
211/// Validates that the Claude Code binary meets the minimum version.
212fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
213    let output = Command::new(binary)
214        .arg("--version")
215        .stdin(Stdio::null())
216        .stdout(Stdio::piped())
217        .stderr(Stdio::piped())
218        .output()
219        .map_err(AppError::Io)?;
220
221    if !output.status.success() {
222        return Err(AppError::Validation(
223            "failed to run 'claude --version'".to_string(),
224        ));
225    }
226
227    let version_str = String::from_utf8(output.stdout)
228        .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
229    let version = version_str.trim().to_string();
230
231    // Extract the numeric version part before first space or paren, e.g. "2.1.149 (Claude Code)" -> "2.1.149"
232    let numeric = version.split([' ', '(']).next().unwrap_or("").trim();
233
234    fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
235        let parts: Vec<&str> = s.splitn(3, '.').collect();
236        if parts.len() < 2 {
237            return None;
238        }
239        let major = parts[0].parse::<u64>().ok()?;
240        let minor = parts[1].parse::<u64>().ok()?;
241        let patch = parts
242            .get(2)
243            .and_then(|p| p.parse::<u64>().ok())
244            .unwrap_or(0);
245        Some((major, minor, patch))
246    }
247
248    if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CLAUDE_VERSION)) {
249        if actual < min {
250            return Err(AppError::Validation(format!(
251                "Claude Code version {numeric} is below minimum required {MIN_CLAUDE_VERSION}"
252            )));
253        }
254    }
255
256    Ok(version)
257}
258
259/// Invokes `claude -p` for a single file and returns the extraction result.
260///
261/// OAuth-only enforcement (gaps.md:41-49, v1.0.69 mandate):
262///
263/// - `wait-timeout` for cross-platform subprocess timeout.
264/// - `env_clear()` for least-privilege environment.
265/// - OAuth-only flow: NO `--bare` (PROHIBITED, gaps.md:49), no API-key path.
266/// - Mandatory hardening: `--strict-mcp-config --mcp-config '{}'` to zero
267///   MCP servers, and `--settings '{"hooks":{}}'` to disable hooks.
268/// - If `ANTHROPIC_API_KEY` is set in the environment we ABORT the spawn
269///   (return a `false` command with a violation marker) — API-key path is
270///   PROHIBITED in this project.
271fn extract_with_claude(
272    binary: &Path,
273    file_content: &[u8],
274    model: Option<&str>,
275    timeout_secs: u64,
276) -> Result<(ExtractionResult, f64, bool), AppError> {
277    use wait_timeout::ChildExt;
278
279    // OAuth-only guard (gaps.md:47). If `ANTHROPIC_API_KEY` is set in the
280    // environment we MUST abort — that is the API-key path which is
281    // explicitly PROHIBITED. Use the OAuth flow exclusively.
282    if let Ok(_key) = std::env::var("ANTHROPIC_API_KEY") {
283        let mut cmd = Command::new("false");
284        cmd.env_clear();
285        cmd.env("PATH", "/nonexistent");
286        cmd.arg("--oauth-only-violation-anthropic-api-key-set");
287        return Err(AppError::Validation(
288            "ANTHROPIC_API_KEY is set in the environment; \
289             sqlite-graphrag operates exclusively with OAuth (Pro/Max) and \
290             the API-key path is PROHIBITED (gaps.md:47). Unset the variable \
291             and re-run with `claude login` already completed in this session."
292                .to_string(),
293        ));
294    }
295
296    let mut cmd = Command::new(binary);
297
298    cmd.env_clear();
299    for var in &[
300        "PATH",
301        "HOME",
302        "USER",
303        "SHELL",
304        "TERM",
305        "LANG",
306        "XDG_CONFIG_HOME",
307        "XDG_DATA_HOME",
308        "XDG_RUNTIME_DIR",
309        // NOTE: `ANTHROPIC_API_KEY` is INTENTIONALLY ABSENT (gaps.md:47).
310        "CLAUDE_CONFIG_DIR",
311        "TMPDIR",
312        "TMP",
313        "TEMP",
314        "DYLD_FALLBACK_LIBRARY_PATH",
315    ] {
316        if let Ok(val) = std::env::var(var) {
317            cmd.env(var, val);
318        }
319    }
320
321    #[cfg(windows)]
322    for var in &[
323        "LOCALAPPDATA",
324        "APPDATA",
325        "USERPROFILE",
326        "SystemRoot",
327        "COMSPEC",
328        "PATHEXT",
329        "HOMEPATH",
330        "HOMEDRIVE",
331    ] {
332        if let Ok(val) = std::env::var(var) {
333            cmd.env(var, val);
334        }
335    }
336
337    // Canonical OAuth-only command line (gaps.md:201-208 + 211-213).
338    // `--bare` is PROHIBITED (gaps.md:49) — never emitted.
339    cmd.arg("-p")
340        .arg(EXTRACTION_PROMPT)
341        .arg("--strict-mcp-config")
342        .arg("--mcp-config")
343        .arg("{}")
344        .arg("--dangerously-skip-permissions")
345        .arg("--settings")
346        .arg(r#"{"hooks":{}}"#)
347        .arg("--output-format")
348        .arg("json")
349        .arg("--json-schema")
350        .arg(EXTRACTION_SCHEMA)
351        .arg("--max-turns")
352        .arg("7")
353        .arg("--no-session-persistence");
354
355    if let Some(m) = model {
356        cmd.arg("--model").arg(m);
357    }
358
359    cmd.stdin(Stdio::piped())
360        .stdout(Stdio::piped())
361        .stderr(Stdio::piped());
362
363    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
364        AppError::Io(std::io::Error::new(
365            e.kind(),
366            format!("failed to spawn claude: {e}"),
367        ))
368    })?;
369
370    let stdin_data = file_content.to_vec();
371    let mut child_stdin = child
372        .stdin
373        .take()
374        .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
375    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
376        child_stdin.write_all(&stdin_data)?;
377        drop(child_stdin);
378        Ok(())
379    });
380
381    let start = std::time::Instant::now();
382    let timeout = std::time::Duration::from_secs(timeout_secs);
383    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
384
385    match status {
386        Some(exit_status) => {
387            stdin_thread
388                .join()
389                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
390                .map_err(AppError::Io)?;
391
392            tracing::debug!(
393                target: "process",
394                exit_code = ?exit_status.code(),
395                elapsed_ms = start.elapsed().as_millis() as u64,
396                "external process completed"
397            );
398
399            let mut stdout_buf = Vec::new();
400            let mut stderr_buf = Vec::new();
401            if let Some(mut out) = child.stdout.take() {
402                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
403            }
404            if let Some(mut err) = child.stderr.take() {
405                std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
406            }
407
408            if !exit_status.success() {
409                let stdout_str = String::from_utf8_lossy(&stdout_buf);
410                if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
411                {
412                    if let Some(re) = elements
413                        .iter()
414                        .find(|e| e.r#type.as_deref() == Some("result"))
415                    {
416                        if re.terminal_reason.as_deref() == Some("max_turns") {
417                            tracing::warn!(
418                                target: "ingest",
419                                "extraction hit max_turns limit — hooks may have consumed turns"
420                            );
421                            return Err(AppError::Validation(
422                                "claude -p hit max_turns: hooks may be consuming turns".into(),
423                            ));
424                        }
425                        if re.is_error {
426                            let err_msg = re
427                                .error
428                                .as_deref()
429                                .or(re.result.as_deref())
430                                .unwrap_or("unknown error");
431                            if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
432                                return Err(AppError::RateLimited {
433                                    detail: err_msg.to_string(),
434                                });
435                            }
436                            if err_msg.contains("Not logged in")
437                                || err_msg.contains("authentication")
438                            {
439                                tracing::warn!(
440                                    target: "ingest",
441                                    "Claude Code authentication failed. Re-authenticate interactively with: claude"
442                                );
443                            }
444                            return Err(AppError::Validation(format!(
445                                "claude -p failed: {err_msg}"
446                            )));
447                        }
448                    }
449                }
450                let stderr_str = String::from_utf8_lossy(&stderr_buf);
451                if stderr_str.contains("auth") || stderr_str.contains("login") {
452                    tracing::warn!(
453                        target: "ingest",
454                        "Claude Code authentication may have failed. Re-authenticate with: claude"
455                    );
456                }
457                return Err(AppError::Validation(format!(
458                    "claude -p exited with code {:?}: {}",
459                    exit_status.code(),
460                    stderr_str.trim()
461                )));
462            }
463
464            let stdout = String::from_utf8(stdout_buf)
465                .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
466            parse_claude_output(&stdout)
467        }
468        None => {
469            tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
470            let _ = child.kill();
471            let _ = child.wait();
472            let _ = stdin_thread.join();
473            Err(AppError::Validation(format!(
474                "claude -p timed out after {timeout_secs} seconds"
475            )))
476        }
477    }
478}
479
480/// Parses the JSON array output from `claude -p --output-format json`.
481///
482/// Returns `(extraction, cost_usd, is_oauth)` where `is_oauth` is true when
483/// the init element reports `apiKeySource: "none"` (OAuth subscription).
484fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64, bool), AppError> {
485    let elements: Vec<ClaudeOutputElement> = serde_json::from_str(stdout).map_err(|e| {
486        AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
487    })?;
488
489    let is_oauth = elements
490        .iter()
491        .find(|e| e.r#type.as_deref() == Some("system") && e.subtype.as_deref() == Some("init"))
492        .and_then(|e| e.api_key_source.as_deref())
493        .map(|s| s == "none")
494        .unwrap_or(false);
495
496    let result_elem = elements
497        .iter()
498        .find(|e| e.r#type.as_deref() == Some("result"))
499        .ok_or_else(|| {
500            AppError::Validation("claude output missing 'result' element".to_string())
501        })?;
502
503    if result_elem.is_error {
504        let err_msg = result_elem
505            .error
506            .as_deref()
507            .or(result_elem.result.as_deref())
508            .unwrap_or("unknown error");
509        if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
510            return Err(AppError::RateLimited {
511                detail: err_msg.to_string(),
512            });
513        }
514        return Err(AppError::Validation(format!(
515            "claude extraction failed: {err_msg}"
516        )));
517    }
518
519    let extraction = result_elem
520        .structured_output
521        .clone()
522        .or_else(|| {
523            result_elem
524                .result
525                .as_ref()
526                .and_then(|text| serde_json::from_str::<ExtractionResult>(text).ok())
527        })
528        .ok_or_else(|| {
529            AppError::Validation("claude result missing structured_output and result field".into())
530        })?;
531
532    let cost = result_elem.total_cost_usd.unwrap_or(0.0);
533
534    Ok((extraction, cost, is_oauth))
535}
536
537use crate::output::emit_json_line as emit_json;
538
539/// Collects files matching the pattern (reuses ingest logic).
540fn collect_matching_files(
541    dir: &Path,
542    pattern: &str,
543    recursive: bool,
544    max_files: usize,
545) -> Result<Vec<PathBuf>, AppError> {
546    let mut files = Vec::new();
547    super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
548    files.sort_unstable();
549
550    if files.len() > max_files {
551        return Err(AppError::Validation(format!(
552            "found {} files, exceeds --max-files cap of {}",
553            files.len(),
554            max_files
555        )));
556    }
557
558    Ok(files)
559}
560
561/// Opens or creates the queue database for tracking ingest progress.
562fn open_queue_db(path: &str) -> Result<Connection, AppError> {
563    let conn = Connection::open(path)?;
564
565    conn.pragma_update(None, "journal_mode", "wal")?;
566
567    conn.execute_batch(
568        "CREATE TABLE IF NOT EXISTS queue (
569            id          INTEGER PRIMARY KEY AUTOINCREMENT,
570            file_path   TEXT NOT NULL UNIQUE,
571            name        TEXT,
572            status      TEXT NOT NULL DEFAULT 'pending',
573            memory_id   INTEGER,
574            entities    INTEGER DEFAULT 0,
575            rels        INTEGER DEFAULT 0,
576            error       TEXT,
577            cost_usd    REAL DEFAULT 0.0,
578            attempt     INTEGER DEFAULT 0,
579            elapsed_ms  INTEGER,
580            created_at  TEXT DEFAULT (datetime('now')),
581            done_at     TEXT
582        );
583        CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
584    )?;
585
586    Ok(conn)
587}
588
589/// Main entry point for `ingest --mode claude-code`.
590pub fn run_claude_ingest(args: &IngestArgs) -> Result<(), AppError> {
591    let started = Instant::now();
592
593    if !args.dir.exists() {
594        return Err(AppError::Validation(format!(
595            "directory not found: {}",
596            args.dir.display()
597        )));
598    }
599
600    // G28-B (v1.0.68) + G30 (v1.0.69): acquire singleton before doing real
601    // work so two parallel `ingest --mode claude-code` invocations cannot
602    // co-exist on the same database. Scope includes the database hash so
603    // concurrent ingest against different databases is allowed.
604    let early_ns = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
605    let early_paths = AppPaths::resolve(args.db.as_deref())?;
606    let _singleton = crate::lock::acquire_job_singleton(
607        crate::lock::JobType::IngestClaudeCode,
608        &early_ns,
609        &early_paths.db,
610        args.wait_job_singleton,
611        args.force_job_singleton,
612    )?;
613
614    // Stage 1: Validate
615    let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
616    let version = validate_claude_version(&claude_binary)?;
617    tracing::info!(
618        target: "ingest",
619        binary = %claude_binary.display(),
620        version = %version,
621        "Claude Code binary validated"
622    );
623
624    emit_json(&PhaseEvent {
625        phase: "validate",
626        claude_path: claude_binary.to_str(),
627        version: Some(&version),
628        dir: None,
629        files_total: None,
630        files_new: None,
631        files_existing: None,
632    });
633
634    // Stage 2: Scan
635    let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
636
637    let queue_conn = open_queue_db(&args.queue_db)?;
638
639    if args.resume {
640        let reset = queue_conn
641            .execute(
642                "UPDATE queue SET status='pending' WHERE status='processing'",
643                [],
644            )
645            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
646        if reset > 0 {
647            tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
648        }
649    }
650
651    if args.retry_failed {
652        let count = queue_conn
653            .execute(
654                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
655                [],
656            )
657            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
658        tracing::info!(target: "ingest", count, "retrying failed files");
659    }
660
661    if !args.resume && !args.retry_failed {
662        queue_conn
663            .execute("DELETE FROM queue", [])
664            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
665    }
666
667    let mut new_count = 0usize;
668    let mut existing_count = 0usize;
669
670    if !args.retry_failed {
671        for file in &files {
672            let file_str = file.to_string_lossy().into_owned();
673            let inserted = queue_conn
674                .execute(
675                    "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
676                    rusqlite::params![file_str],
677                )
678                .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
679            if inserted > 0 {
680                new_count += 1;
681            } else {
682                existing_count += 1;
683            }
684        }
685    }
686
687    emit_json(&PhaseEvent {
688        phase: "scan",
689        claude_path: None,
690        version: None,
691        dir: args.dir.to_str(),
692        files_total: Some(files.len()),
693        files_new: Some(new_count),
694        files_existing: Some(existing_count),
695    });
696
697    if args.dry_run {
698        for (idx, file) in files.iter().enumerate() {
699            let (name, _truncated, _orig) =
700                super::ingest::derive_kebab_name(file, args.max_name_length);
701            emit_json(&FileEvent {
702                file: &file.to_string_lossy(),
703                name: &name,
704                status: "preview",
705                memory_id: None,
706                entities: None,
707                rels: None,
708                cost_usd: None,
709                elapsed_ms: None,
710                error: None,
711                index: idx,
712                total: files.len(),
713            });
714        }
715        emit_json(&Summary {
716            summary: true,
717            files_total: files.len(),
718            completed: 0,
719            failed: 0,
720            skipped: 0,
721            entities_total: 0,
722            rels_total: 0,
723            cost_usd: 0.0,
724            elapsed_ms: started.elapsed().as_millis() as u64,
725        });
726        if !args.keep_queue {
727            let _ = std::fs::remove_file(&args.queue_db);
728        }
729        return Ok(());
730    }
731
732    // Stage 3: Process
733    let paths = AppPaths::resolve(args.db.as_deref())?;
734    ensure_db_ready(&paths)?;
735    let conn = open_rw(&paths.db)?;
736    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
737    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
738    let memory_type_str = args.r#type.as_str().to_string();
739
740    let mut completed = 0usize;
741    let mut failed = 0usize;
742    let skipped_initial: usize = queue_conn
743        .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
744            r.get::<_, usize>(0)
745        })
746        .unwrap_or(0);
747    let mut skipped = skipped_initial;
748    let mut entities_total = 0usize;
749    let mut rels_total = 0usize;
750    let mut cost_total = 0.0f64;
751    let mut oauth_detected = false;
752    let total = files.len();
753
754    let mut backoff_secs = args.rate_limit_wait;
755    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
756
757    loop {
758        if crate::shutdown_requested() {
759            tracing::info!(target: "ingest", "shutdown requested, stopping before next file");
760            break;
761        }
762
763        let pending: Option<(i64, String)> = queue_conn
764            .query_row(
765                "UPDATE queue SET status='processing', attempt=attempt+1 \
766                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
767                 RETURNING id, file_path",
768                [],
769                |row| Ok((row.get(0)?, row.get(1)?)),
770            )
771            .ok();
772
773        let (queue_id, file_path) = match pending {
774            Some(p) => p,
775            None => break,
776        };
777
778        let file_started = Instant::now();
779
780        // G05: reject files that exceed the 10 MB stdin limit
781        const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
782        if let Ok(meta) = std::fs::metadata(&file_path) {
783            if meta.len() > MAX_FILE_SIZE {
784                let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
785                let _ = queue_conn.execute(
786                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
787                    rusqlite::params![err_msg, queue_id],
788                );
789                let current_index = completed + failed + skipped;
790                failed += 1;
791                emit_json(&FileEvent {
792                    file: &file_path,
793                    name: "",
794                    status: "failed",
795                    memory_id: None,
796                    entities: None,
797                    rels: None,
798                    cost_usd: None,
799                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
800                    error: Some(&err_msg),
801                    index: current_index,
802                    total,
803                });
804                if args.fail_fast {
805                    break;
806                }
807                continue;
808            }
809        }
810
811        let file_content = match std::fs::read(&file_path) {
812            Ok(c) => c,
813            Err(e) => {
814                let err_msg = format!("IO error: {e}");
815                let _ = queue_conn.execute(
816                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
817                    rusqlite::params![err_msg, queue_id],
818                );
819                let current_index = completed + failed + skipped;
820                failed += 1;
821                emit_json(&FileEvent {
822                    file: &file_path,
823                    name: "",
824                    status: "failed",
825                    memory_id: None,
826                    entities: None,
827                    rels: None,
828                    cost_usd: None,
829                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
830                    error: Some(&err_msg),
831                    index: current_index,
832                    total,
833                });
834                if args.fail_fast {
835                    break;
836                }
837                continue;
838            }
839        };
840
841        // B08: skip files exceeding body cap BEFORE sending to LLM to avoid wasting tokens
842        if file_content.len() > crate::constants::MAX_MEMORY_BODY_LEN {
843            let err_msg = format!(
844                "file body exceeds {} byte limit ({} bytes) — skipping to avoid wasting LLM tokens",
845                crate::constants::MAX_MEMORY_BODY_LEN,
846                file_content.len()
847            );
848            tracing::warn!(target: "ingest", file = %file_path, size = file_content.len(), "body exceeds limit, skipping LLM extraction");
849            let _ = queue_conn.execute(
850                "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
851                rusqlite::params![err_msg, queue_id],
852            );
853            let current_index = completed + failed + skipped;
854            skipped += 1;
855            emit_json(&FileEvent {
856                file: &file_path,
857                name: "",
858                status: "skipped",
859                memory_id: None,
860                entities: None,
861                rels: None,
862                cost_usd: None,
863                elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
864                error: Some(&err_msg),
865                index: current_index,
866                total,
867            });
868            continue;
869        }
870
871        // B07: retry once on cold-start failure (Claude Code Issue #23265)
872        let max_extract_attempts: u32 = 2;
873        let mut extraction_result: Option<(ExtractionResult, f64, bool)> = None;
874        let mut last_extract_err: Option<String> = None;
875        let mut last_was_rate_limited = false;
876
877        for attempt in 1..=max_extract_attempts {
878            match extract_with_claude(
879                &claude_binary,
880                &file_content,
881                args.claude_model.as_deref(),
882                args.claude_timeout,
883            ) {
884                Ok(result) => {
885                    extraction_result = Some(result);
886                    break;
887                }
888                Err(ref e) if matches!(e, AppError::RateLimited { .. }) => {
889                    last_extract_err = Some(format!("{e}"));
890                    last_was_rate_limited = true;
891                    break;
892                }
893                Err(e) => {
894                    let msg = format!("{e}");
895                    if attempt < max_extract_attempts {
896                        let cold_start_delay = 2 * attempt as u64;
897                        tracing::warn!(target: "ingest", attempt, delay_secs = cold_start_delay, error = %msg, "extraction failed, retrying (cold-start workaround)");
898                        std::thread::sleep(std::time::Duration::from_secs(cold_start_delay));
899                    }
900                    last_extract_err = Some(msg);
901                }
902            }
903        }
904
905        if let Some((extraction, cost, is_oauth)) = extraction_result {
906            if is_oauth && !oauth_detected {
907                oauth_detected = true;
908                tracing::info!(target: "ingest", "OAuth subscription detected — cost_usd omitted from output");
909            }
910            backoff_secs = args.rate_limit_wait;
911
912            let (normalized_name, _truncated, _orig) = crate::commands::ingest::derive_kebab_name(
913                std::path::Path::new(&extraction.name),
914                args.max_name_length,
915            );
916            let name = &normalized_name;
917            let ent_count = extraction.entities.len();
918            let rel_count = extraction.relationships.len();
919
920            let new_entities: Vec<NewEntity> = extraction
921                .entities
922                .iter()
923                .filter_map(|e| match e.entity_type.parse::<EntityType>() {
924                    Ok(et) => Some(NewEntity {
925                        name: e.name.clone(),
926                        entity_type: et,
927                        description: None,
928                    }),
929                    Err(_) => {
930                        tracing::warn!(
931                            target: "ingest",
932                            entity = %e.name,
933                            entity_type = %e.entity_type,
934                            "entity type not recognized, skipping"
935                        );
936                        None
937                    }
938                })
939                .collect();
940
941            let new_relationships: Vec<NewRelationship> = extraction
942                .relationships
943                .iter()
944                .map(|r| NewRelationship {
945                    source: r.source.clone(),
946                    target: r.target.clone(),
947                    relation: crate::parsers::normalize_relation(&r.relation),
948                    strength: r.strength,
949                    description: None,
950                })
951                .collect();
952
953            let body_str = String::from_utf8_lossy(&file_content);
954            let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
955            let new_memory = NewMemory {
956                name: name.clone(),
957                namespace: namespace.clone(),
958                memory_type: memory_type_str.clone(),
959                description: extraction.description.clone(),
960                body: body_str.to_string(),
961                body_hash,
962                session_id: None,
963                source: "agent".to_string(),
964                metadata: serde_json::Value::Object(serde_json::Map::new()),
965            };
966
967            // B06: deduplication — update existing memory instead of failing on UNIQUE
968            let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
969                Some((existing_id, is_deleted)) => {
970                    if is_deleted {
971                        memories::clear_deleted_at(&conn, existing_id)?;
972                    }
973                    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
974                        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
975                        rusqlite::params![existing_id],
976                        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
977                    )?;
978                    memories::update(&conn, existing_id, &new_memory, None)?;
979                    memories::sync_fts_after_update(
980                        &conn,
981                        existing_id,
982                        &old_name,
983                        &old_desc,
984                        &old_body,
985                        &new_memory.name,
986                        &new_memory.description,
987                        &new_memory.body,
988                    )?;
989                    tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
990                    existing_id
991                }
992                None => match memories::insert(&conn, &new_memory) {
993                    Ok(id) => id,
994                    Err(e) => {
995                        let err_msg = format!("{e}");
996                        let _ = queue_conn.execute(
997                                "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
998                                rusqlite::params![err_msg, queue_id],
999                            );
1000                        let current_index = completed + failed + skipped;
1001                        failed += 1;
1002                        emit_json(&FileEvent {
1003                            file: &file_path,
1004                            name,
1005                            status: "failed",
1006                            memory_id: None,
1007                            entities: None,
1008                            rels: None,
1009                            cost_usd: if is_oauth { None } else { Some(cost) },
1010                            elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1011                            error: Some(&err_msg),
1012                            index: current_index,
1013                            total,
1014                        });
1015                        if !is_oauth {
1016                            cost_total += cost;
1017                        }
1018                        if args.fail_fast {
1019                            break;
1020                        }
1021                        continue;
1022                    }
1023                },
1024            };
1025
1026            for ent in &new_entities {
1027                match entities::upsert_entity(&conn, &namespace, ent) {
1028                    Ok(eid) => {
1029                        let _ = entities::link_memory_entity(&conn, memory_id, eid);
1030                    }
1031                    Err(e) => {
1032                        tracing::warn!(
1033                            target: "ingest",
1034                            entity = %ent.name,
1035                            error = %e,
1036                            "entity skipped due to validation error"
1037                        );
1038                    }
1039                }
1040            }
1041            for rel in &new_relationships {
1042                crate::parsers::warn_if_non_canonical(&rel.relation);
1043                let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
1044                let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
1045                if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1046                    let _ = conn.execute(
1047                        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
1048                        rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
1049                    );
1050                }
1051            }
1052
1053            // G01: embedding pipeline — enables recall to find memories created via --mode claude-code
1054            let body_text = String::from_utf8_lossy(&file_content).into_owned();
1055            let snippet: String = body_text.chars().take(200).collect();
1056            let chunks_info =
1057                crate::chunking::split_into_chunks_hierarchical(&body_text, tokenizer);
1058
1059            let embedding_result = if chunks_info.len() <= 1 {
1060                crate::daemon::embed_passage_or_local(&paths.models, &body_text)
1061            } else {
1062                let mut chunk_embeddings: Vec<Vec<f32>> = Vec::with_capacity(chunks_info.len());
1063                let mut multi_ok = true;
1064                for chunk in &chunks_info {
1065                    let chunk_text = crate::chunking::chunk_text(&body_text, chunk);
1066                    match crate::daemon::embed_passage_or_local(&paths.models, chunk_text) {
1067                        Ok(emb) => chunk_embeddings.push(emb),
1068                        Err(e) => {
1069                            tracing::warn!(
1070                                target: "ingest",
1071                                file = %file_path,
1072                                error = %e,
1073                                "chunk embedding failed, skipping vector index for this file"
1074                            );
1075                            multi_ok = false;
1076                            break;
1077                        }
1078                    }
1079                }
1080                if multi_ok {
1081                    let aggregated = crate::chunking::aggregate_embeddings(&chunk_embeddings);
1082                    // persist per-chunk vectors
1083                    if let Err(e) = crate::storage::chunks::insert_chunk_slices(
1084                        &conn,
1085                        memory_id,
1086                        &body_text,
1087                        &chunks_info,
1088                    ) {
1089                        tracing::warn!(
1090                            target: "ingest",
1091                            file = %file_path,
1092                            error = %e,
1093                            "chunk slice insert failed"
1094                        );
1095                    } else {
1096                        for (i, emb) in chunk_embeddings.iter().enumerate() {
1097                            if let Err(e) = crate::storage::chunks::upsert_chunk_vec(
1098                                &conn, i as i64, memory_id, i as i32, emb,
1099                            ) {
1100                                tracing::warn!(
1101                                    target: "ingest",
1102                                    file = %file_path,
1103                                    chunk = i,
1104                                    error = %e,
1105                                    "chunk vec upsert failed"
1106                                );
1107                            }
1108                        }
1109                    }
1110                    Ok(aggregated)
1111                } else {
1112                    // fallback: embed whole body for the memory-level vector
1113                    crate::daemon::embed_passage_or_local(&paths.models, &body_text)
1114                }
1115            };
1116
1117            match embedding_result {
1118                Ok(embedding) => {
1119                    if let Err(e) = memories::upsert_vec(
1120                        &conn,
1121                        memory_id,
1122                        &namespace,
1123                        &memory_type_str,
1124                        &embedding,
1125                        name,
1126                        &snippet,
1127                    ) {
1128                        tracing::warn!(
1129                            target: "ingest",
1130                            file = %file_path,
1131                            error = %e,
1132                            "memory vec upsert failed; recall may not find this memory"
1133                        );
1134                    }
1135                    // embed each entity that was successfully upserted
1136                    for ent in &new_entities {
1137                        if let Ok(Some(eid)) =
1138                            entities::find_entity_id(&conn, &namespace, &ent.name)
1139                        {
1140                            let entity_text = ent.name.clone();
1141                            match crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
1142                            {
1143                                Ok(emb) => {
1144                                    if let Err(e) = entities::upsert_entity_vec(
1145                                        &conn,
1146                                        eid,
1147                                        &namespace,
1148                                        ent.entity_type,
1149                                        &emb,
1150                                        &ent.name,
1151                                    ) {
1152                                        tracing::warn!(
1153                                            target: "ingest",
1154                                            entity = %ent.name,
1155                                            error = %e,
1156                                            "entity vec upsert failed"
1157                                        );
1158                                    }
1159                                }
1160                                Err(e) => {
1161                                    tracing::warn!(
1162                                        target: "ingest",
1163                                        entity = %ent.name,
1164                                        error = %e,
1165                                        "entity embedding failed"
1166                                    );
1167                                }
1168                            }
1169                        }
1170                    }
1171                }
1172                Err(e) => {
1173                    tracing::warn!(
1174                        target: "ingest",
1175                        file = %file_path,
1176                        error = %e,
1177                        "memory embedding failed; recall will not find this memory"
1178                    );
1179                }
1180            }
1181
1182            let _ = queue_conn.execute(
1183                "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
1184                rusqlite::params![
1185                    name,
1186                    memory_id,
1187                    ent_count,
1188                    rel_count,
1189                    cost,
1190                    file_started.elapsed().as_millis() as i64,
1191                    queue_id
1192                ],
1193            );
1194
1195            let current_index = completed + failed + skipped;
1196            completed += 1;
1197            entities_total += ent_count;
1198            rels_total += rel_count;
1199            if !is_oauth {
1200                cost_total += cost;
1201            }
1202
1203            emit_json(&FileEvent {
1204                file: &file_path,
1205                name,
1206                status: "done",
1207                memory_id: Some(memory_id),
1208                entities: Some(ent_count),
1209                rels: Some(rel_count),
1210                cost_usd: if is_oauth { None } else { Some(cost) },
1211                elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1212                error: None,
1213                index: current_index,
1214                total,
1215            });
1216        } else if let Some(ref err_str) = last_extract_err {
1217            if last_was_rate_limited {
1218                if crate::retry::is_kill_switch_active() {
1219                    tracing::warn!(target: "ingest", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1220                } else if std::time::Instant::now() >= rate_limit_deadline {
1221                    tracing::error!(target: "ingest", "rate-limit retry deadline (1h) exhausted");
1222                } else {
1223                    let half = backoff_secs / 2;
1224                    let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1225                    let actual_wait = half + jitter;
1226                    tracing::warn!(target: "ingest", delay_secs = actual_wait, error_kind = "rate_limited", "rate limited, backing off");
1227                    let _ = queue_conn.execute(
1228                        "UPDATE queue SET status='pending' WHERE id=?1",
1229                        rusqlite::params![queue_id],
1230                    );
1231                    std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1232                    backoff_secs = (backoff_secs * 2).min(900);
1233                    continue;
1234                }
1235            } else {
1236                let _ = queue_conn.execute(
1237                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1238                    rusqlite::params![err_str, queue_id],
1239                );
1240                let current_index = completed + failed + skipped;
1241                failed += 1;
1242                emit_json(&FileEvent {
1243                    file: &file_path,
1244                    name: "",
1245                    status: "failed",
1246                    memory_id: None,
1247                    entities: None,
1248                    rels: None,
1249                    cost_usd: None,
1250                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1251                    error: Some(err_str),
1252                    index: current_index,
1253                    total,
1254                });
1255                if args.fail_fast {
1256                    break;
1257                }
1258            }
1259        }
1260
1261        if let Some(budget) = args.max_cost_usd {
1262            if oauth_detected {
1263                tracing::debug!(target: "ingest", "--max-cost-usd ignored: OAuth subscription detected");
1264            } else if cost_total >= budget {
1265                tracing::warn!(
1266                    target: "ingest",
1267                    spent = cost_total,
1268                    budget = budget,
1269                    "budget exceeded, stopping"
1270                );
1271                break;
1272            }
1273        }
1274    }
1275
1276    // Stage 4: Summary
1277    let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1278    let _ = queue_conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1279
1280    emit_json(&Summary {
1281        summary: true,
1282        files_total: total,
1283        completed,
1284        failed,
1285        skipped,
1286        entities_total,
1287        rels_total,
1288        cost_usd: cost_total,
1289        elapsed_ms: started.elapsed().as_millis() as u64,
1290    });
1291
1292    if !args.keep_queue && failed == 0 {
1293        let _ = std::fs::remove_file(&args.queue_db);
1294    }
1295
1296    Ok(())
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301    use super::*;
1302
1303    #[test]
1304    fn test_extraction_schema_valid_json() {
1305        let _: serde_json::Value =
1306            serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
1307    }
1308
1309    #[test]
1310    fn test_parse_claude_output_valid() {
1311        let output = r#"[
1312            {"type":"system","subtype":"init"},
1313            {"type":"assistant"},
1314            {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
1315        ]"#;
1316        let (result, cost, _is_oauth) = parse_claude_output(output).expect("parse must succeed");
1317        assert_eq!(result.name, "test-doc");
1318        assert_eq!(result.entities.len(), 1);
1319        assert_eq!(result.relationships.len(), 1);
1320        assert!((cost - 0.02).abs() < f64::EPSILON);
1321    }
1322
1323    #[test]
1324    fn test_parse_claude_output_error() {
1325        let output = r#"[
1326            {"type":"system","subtype":"init"},
1327            {"type":"result","is_error":true,"error":"authentication failed"}
1328        ]"#;
1329        let err = parse_claude_output(output).unwrap_err();
1330        assert!(format!("{err}").contains("authentication failed"));
1331    }
1332
1333    #[test]
1334    fn test_parse_claude_output_rate_limit() {
1335        let output = r#"[
1336            {"type":"system","subtype":"init"},
1337            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
1338        ]"#;
1339        let err = parse_claude_output(output).unwrap_err();
1340        assert!(matches!(err, AppError::RateLimited { .. }));
1341    }
1342
1343    #[test]
1344    fn test_parse_claude_output_malformed() {
1345        let output = "not json at all";
1346        assert!(parse_claude_output(output).is_err());
1347    }
1348
1349    #[test]
1350    fn test_find_claude_binary_not_found() {
1351        let original_path = std::env::var_os("PATH");
1352        std::env::set_var("PATH", "/nonexistent");
1353        std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
1354        let result = find_claude_binary(None);
1355        if let Some(p) = original_path {
1356            std::env::set_var("PATH", p);
1357        }
1358        assert!(result.is_err());
1359    }
1360
1361    #[test]
1362    fn test_parse_claude_output_result_fallback() {
1363        let output = r#"[
1364            {"type":"system","subtype":"init"},
1365            {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1366        ]"#;
1367        let (result, cost, _is_oauth) =
1368            parse_claude_output(output).expect("result fallback must work");
1369        assert_eq!(result.name, "test-fallback");
1370        assert_eq!(result.entities.len(), 1);
1371        assert!(result.relationships.is_empty());
1372        assert!((cost - 0.01).abs() < f64::EPSILON);
1373    }
1374
1375    #[test]
1376    fn test_parse_claude_output_error_with_result_field() {
1377        let output = r#"[
1378            {"type":"system","subtype":"init"},
1379            {"type":"result","is_error":true,"result":"Not logged in · Please run /login"}
1380        ]"#;
1381        let err = parse_claude_output(output).unwrap_err();
1382        let msg = format!("{err}");
1383        assert!(
1384            msg.contains("Not logged in"),
1385            "expected 'Not logged in' in: {msg}"
1386        );
1387    }
1388
1389    #[test]
1390    fn test_terminal_reason_max_turns_detected() {
1391        let output = r#"[
1392            {"type":"system","subtype":"init"},
1393            {"type":"result","is_error":false,"terminal_reason":"max_turns","structured_output":{"name":"t","description":"d","entities":[],"relationships":[]}}
1394        ]"#;
1395        let err_or_ok = parse_claude_output(output);
1396        assert!(
1397            err_or_ok.is_ok(),
1398            "max_turns in result without is_error should still parse"
1399        );
1400    }
1401
1402    #[test]
1403    fn test_detect_oauth_from_init_json() {
1404        let output = r#"[
1405            {"type":"system","subtype":"init","apiKeySource":"none"},
1406            {"type":"result","is_error":false,"total_cost_usd":0.50,"structured_output":{"name":"test-oauth","description":"oauth test","entities":[],"relationships":[]}}
1407        ]"#;
1408        let (_result, cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1409        assert!(is_oauth, "apiKeySource=none must be detected as OAuth");
1410        assert!((cost - 0.50).abs() < f64::EPSILON);
1411    }
1412
1413    #[test]
1414    fn test_api_key_source_not_oauth() {
1415        let output = r#"[
1416            {"type":"system","subtype":"init","apiKeySource":"env"},
1417            {"type":"result","is_error":false,"total_cost_usd":0.10,"structured_output":{"name":"test-api","description":"api test","entities":[],"relationships":[]}}
1418        ]"#;
1419        let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1420        assert!(!is_oauth, "apiKeySource=env must NOT be detected as OAuth");
1421    }
1422
1423    #[test]
1424    fn test_missing_api_key_source_defaults_not_oauth() {
1425        let output = r#"[
1426            {"type":"system","subtype":"init"},
1427            {"type":"result","is_error":false,"total_cost_usd":0.05,"structured_output":{"name":"test-missing","description":"missing test","entities":[],"relationships":[]}}
1428        ]"#;
1429        let (_result, _cost, is_oauth) = parse_claude_output(output).expect("parse must succeed");
1430        assert!(!is_oauth, "missing apiKeySource must default to not OAuth");
1431    }
1432
1433    #[test]
1434    fn test_extraction_schema_entity_types_match_enum() {
1435        let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1436        let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1437            .as_array()
1438            .expect("schema must have entity_type enum");
1439        for t in types {
1440            let s = t.as_str().unwrap();
1441            assert!(
1442                s.parse::<EntityType>().is_ok(),
1443                "schema entity_type '{s}' not in EntityType enum"
1444            );
1445        }
1446    }
1447}