Skip to main content

sqlite_graphrag/commands/
ingest_claude.rs

1//! Handler for `ingest --mode claude-code`.
2//!
3//! Orchestrates the locally installed Claude Code CLI binary (`claude -p`)
4//! to extract domain-specific entities and relationships from each file,
5//! then persists them via the same pipeline as `remember --graph-stdin`.
6//!
7//! Architecture: P1 One-Shot per file — each file spawns a separate
8//! `claude -p` process with `--json-schema` for guaranteed structured output.
9//! A SQLite queue DB tracks progress for resume/retry support.
10
11use crate::commands::ingest::IngestArgs;
12use crate::entity_type::EntityType;
13use crate::errors::AppError;
14use crate::paths::AppPaths;
15use crate::storage::connection::{ensure_db_ready, open_rw};
16use crate::storage::entities::{self, NewEntity, NewRelationship};
17use crate::storage::memories::{self, NewMemory};
18
19use rusqlite::Connection;
20use serde::{Deserialize, Serialize};
21use std::io::Write;
22use std::path::{Path, PathBuf};
23use std::process::{Command, Stdio};
24use std::time::Instant;
25
26#[allow(dead_code)]
27const MIN_CLAUDE_VERSION: &str = "2.1.0";
28
29const EXTRACTION_SCHEMA: &str = r#"{
30  "type": "object",
31  "properties": {
32    "name": { "type": "string" },
33    "description": { "type": "string" },
34    "entities": {
35      "type": "array",
36      "items": {
37        "type": "object",
38        "properties": {
39          "name": { "type": "string" },
40          "entity_type": {
41            "type": "string",
42            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
43          }
44        },
45        "required": ["name", "entity_type"]
46      }
47    },
48    "relationships": {
49      "type": "array",
50      "items": {
51        "type": "object",
52        "properties": {
53          "source": { "type": "string" },
54          "target": { "type": "string" },
55          "relation": {
56            "type": "string",
57            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
58          },
59          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
60        },
61        "required": ["source","target","relation","strength"]
62      }
63    }
64  },
65  "required": ["name","description","entities","relationships"]
66}"#;
67
68const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
691. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
702. A one-sentence description (10-20 words) summarizing the key insight\n\
713. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
724. Typed relationships between entities with strength scores\n\n\
73Rules:\n\
74- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
75- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
76- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
77- NEVER use 'mentions' as relationship type\n\
78- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
79- Prefer fewer high-quality entities over many low-quality ones\n\
80- Description must answer: What is this about and WHY does it matter?";
81
82#[derive(Debug, Deserialize)]
83struct ClaudeOutputElement {
84    r#type: Option<String>,
85    #[allow(dead_code)]
86    subtype: Option<String>,
87    #[serde(default)]
88    is_error: bool,
89    structured_output: Option<ExtractionResult>,
90    result: Option<String>,
91    total_cost_usd: Option<f64>,
92    error: Option<String>,
93}
94
95#[derive(Debug, Clone, Deserialize, Serialize)]
96pub struct ExtractionResult {
97    pub name: String,
98    pub description: String,
99    pub entities: Vec<ExtractedEntity>,
100    pub relationships: Vec<ExtractedRelationship>,
101}
102
103#[derive(Debug, Clone, Deserialize, Serialize)]
104pub struct ExtractedEntity {
105    pub name: String,
106    pub entity_type: String,
107}
108
109#[derive(Debug, Clone, Deserialize, Serialize)]
110pub struct ExtractedRelationship {
111    pub source: String,
112    pub target: String,
113    pub relation: String,
114    pub strength: f64,
115}
116
117#[derive(Debug, Serialize)]
118struct PhaseEvent<'a> {
119    phase: &'a str,
120    #[serde(skip_serializing_if = "Option::is_none")]
121    claude_path: Option<&'a str>,
122    #[serde(skip_serializing_if = "Option::is_none")]
123    version: Option<&'a str>,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    dir: Option<&'a str>,
126    #[serde(skip_serializing_if = "Option::is_none")]
127    files_total: Option<usize>,
128    #[serde(skip_serializing_if = "Option::is_none")]
129    files_new: Option<usize>,
130    #[serde(skip_serializing_if = "Option::is_none")]
131    files_existing: Option<usize>,
132}
133
134#[derive(Debug, Serialize)]
135struct FileEvent<'a> {
136    file: &'a str,
137    name: &'a str,
138    status: &'a str,
139    #[serde(skip_serializing_if = "Option::is_none")]
140    memory_id: Option<i64>,
141    #[serde(skip_serializing_if = "Option::is_none")]
142    entities: Option<usize>,
143    #[serde(skip_serializing_if = "Option::is_none")]
144    rels: Option<usize>,
145    #[serde(skip_serializing_if = "Option::is_none")]
146    cost_usd: Option<f64>,
147    #[serde(skip_serializing_if = "Option::is_none")]
148    elapsed_ms: Option<u64>,
149    #[serde(skip_serializing_if = "Option::is_none")]
150    error: Option<&'a str>,
151    index: usize,
152    total: usize,
153}
154
155#[derive(Debug, Serialize)]
156struct Summary {
157    summary: bool,
158    files_total: usize,
159    completed: usize,
160    failed: usize,
161    skipped: usize,
162    entities_total: usize,
163    rels_total: usize,
164    cost_usd: f64,
165    elapsed_ms: u64,
166}
167
168/// Locates the Claude Code binary on the system.
169pub fn find_claude_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
170    if let Some(p) = explicit {
171        if p.exists() {
172            return Ok(p.to_path_buf());
173        }
174        return Err(AppError::Validation(format!(
175            "Claude Code binary not found at explicit path: {}",
176            p.display()
177        )));
178    }
179
180    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CLAUDE_BINARY") {
181        let p = PathBuf::from(&env_path);
182        if p.exists() {
183            return Ok(p);
184        }
185    }
186
187    let name = if cfg!(windows) {
188        "claude.exe"
189    } else {
190        "claude"
191    };
192    if let Some(path_var) = std::env::var_os("PATH") {
193        for dir in std::env::split_paths(&path_var) {
194            let candidate = dir.join(name);
195            if candidate.exists() {
196                return Ok(candidate);
197            }
198        }
199    }
200
201    Err(AppError::Validation(
202        "Claude Code binary not found in PATH. Install it from https://docs.anthropic.com/claude-code or specify --claude-binary".to_string(),
203    ))
204}
205
206/// Validates that the Claude Code binary meets the minimum version.
207fn validate_claude_version(binary: &Path) -> Result<String, AppError> {
208    let output = Command::new(binary)
209        .arg("--version")
210        .stdin(Stdio::null())
211        .stdout(Stdio::piped())
212        .stderr(Stdio::piped())
213        .output()
214        .map_err(AppError::Io)?;
215
216    if !output.status.success() {
217        return Err(AppError::Validation(
218            "failed to run 'claude --version'".to_string(),
219        ));
220    }
221
222    let version_str = String::from_utf8(output.stdout)
223        .map_err(|_| AppError::Validation("claude --version output is not UTF-8".to_string()))?;
224    let version = version_str.trim().to_string();
225
226    Ok(version)
227}
228
229/// Invokes `claude -p` for a single file and returns the extraction result.
230///
231/// Uses `wait-timeout` for cross-platform subprocess timeout, `env_clear()`
232/// for least-privilege environment, and `--bare` when `ANTHROPIC_API_KEY`
233/// is available (faster startup) vs `--dangerously-skip-permissions` for
234/// OAuth users.
235fn extract_with_claude(
236    binary: &Path,
237    file_content: &[u8],
238    model: Option<&str>,
239    timeout_secs: u64,
240) -> Result<(ExtractionResult, f64), AppError> {
241    use wait_timeout::ChildExt;
242
243    let mut cmd = Command::new(binary);
244
245    cmd.env_clear();
246    for var in &[
247        "PATH",
248        "HOME",
249        "USER",
250        "SHELL",
251        "TERM",
252        "LANG",
253        "XDG_CONFIG_HOME",
254        "XDG_DATA_HOME",
255        "XDG_RUNTIME_DIR",
256        "ANTHROPIC_API_KEY",
257        "CLAUDE_CONFIG_DIR",
258        "TMPDIR",
259        "TMP",
260        "TEMP",
261        "DYLD_FALLBACK_LIBRARY_PATH",
262    ] {
263        if let Ok(val) = std::env::var(var) {
264            cmd.env(var, val);
265        }
266    }
267
268    cmd.arg("-p")
269        .arg(EXTRACTION_PROMPT)
270        .arg("--output-format")
271        .arg("json")
272        .arg("--json-schema")
273        .arg(EXTRACTION_SCHEMA)
274        .arg("--max-turns")
275        .arg("3")
276        .arg("--no-session-persistence");
277
278    if std::env::var("ANTHROPIC_API_KEY").is_ok() {
279        cmd.arg("--bare");
280    } else {
281        cmd.arg("--dangerously-skip-permissions");
282    }
283
284    if let Some(m) = model {
285        cmd.arg("--model").arg(m);
286    }
287
288    cmd.stdin(Stdio::piped())
289        .stdout(Stdio::piped())
290        .stderr(Stdio::piped());
291
292    let mut child = cmd.spawn().map_err(|e| {
293        AppError::Io(std::io::Error::new(
294            e.kind(),
295            format!("failed to spawn claude: {e}"),
296        ))
297    })?;
298
299    let stdin_data = file_content.to_vec();
300    let mut child_stdin = child
301        .stdin
302        .take()
303        .ok_or_else(|| AppError::Validation("failed to open claude stdin".into()))?;
304    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
305        child_stdin.write_all(&stdin_data)?;
306        Ok(())
307    });
308
309    let timeout = std::time::Duration::from_secs(timeout_secs);
310    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
311
312    match status {
313        Some(exit_status) => {
314            stdin_thread
315                .join()
316                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
317                .map_err(AppError::Io)?;
318
319            let mut stdout_buf = Vec::new();
320            let mut stderr_buf = Vec::new();
321            if let Some(mut out) = child.stdout.take() {
322                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
323            }
324            if let Some(mut err) = child.stderr.take() {
325                std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
326            }
327
328            if !exit_status.success() {
329                let stdout_str = String::from_utf8_lossy(&stdout_buf);
330                if let Ok(elements) = serde_json::from_str::<Vec<ClaudeOutputElement>>(&stdout_str)
331                {
332                    if let Some(re) = elements
333                        .iter()
334                        .find(|e| e.r#type.as_deref() == Some("result"))
335                    {
336                        if re.is_error {
337                            let err_msg = re
338                                .error
339                                .as_deref()
340                                .or(re.result.as_deref())
341                                .unwrap_or("unknown error");
342                            if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
343                                return Err(AppError::Validation(format!(
344                                    "RATE_LIMITED: {err_msg}"
345                                )));
346                            }
347                            return Err(AppError::Validation(format!(
348                                "claude -p failed: {err_msg}"
349                            )));
350                        }
351                    }
352                }
353                let stderr_str = String::from_utf8_lossy(&stderr_buf);
354                return Err(AppError::Validation(format!(
355                    "claude -p exited with code {:?}: {}",
356                    exit_status.code(),
357                    stderr_str.trim()
358                )));
359            }
360
361            let stdout = String::from_utf8(stdout_buf)
362                .map_err(|_| AppError::Validation("claude -p stdout is not valid UTF-8".into()))?;
363            parse_claude_output(&stdout)
364        }
365        None => {
366            tracing::warn!(target: "ingest", timeout_secs, "claude -p timed out, killing process");
367            let _ = child.kill();
368            let _ = child.wait();
369            let _ = stdin_thread.join();
370            Err(AppError::Validation(format!(
371                "claude -p timed out after {timeout_secs} seconds"
372            )))
373        }
374    }
375}
376
377/// Parses the JSON array output from `claude -p --output-format json`.
378fn parse_claude_output(stdout: &str) -> Result<(ExtractionResult, f64), AppError> {
379    let elements: Vec<ClaudeOutputElement> = serde_json::from_str(stdout).map_err(|e| {
380        AppError::Validation(format!("failed to parse claude output as JSON array: {e}"))
381    })?;
382
383    let result_elem = elements
384        .iter()
385        .find(|e| e.r#type.as_deref() == Some("result"))
386        .ok_or_else(|| {
387            AppError::Validation("claude output missing 'result' element".to_string())
388        })?;
389
390    if result_elem.is_error {
391        let err_msg = result_elem
392            .error
393            .as_deref()
394            .or(result_elem.result.as_deref())
395            .unwrap_or("unknown error");
396        if err_msg.contains("rate_limit") || err_msg.contains("overloaded") {
397            return Err(AppError::Validation(format!("RATE_LIMITED: {err_msg}")));
398        }
399        return Err(AppError::Validation(format!(
400            "claude extraction failed: {err_msg}"
401        )));
402    }
403
404    let extraction = result_elem
405        .structured_output
406        .clone()
407        .or_else(|| {
408            result_elem
409                .result
410                .as_ref()
411                .and_then(|text| serde_json::from_str::<ExtractionResult>(text).ok())
412        })
413        .ok_or_else(|| {
414            AppError::Validation("claude result missing structured_output and result field".into())
415        })?;
416
417    let cost = result_elem.total_cost_usd.unwrap_or(0.0);
418
419    Ok((extraction, cost))
420}
421
422fn emit_json<T: Serialize>(value: &T) {
423    if let Ok(json) = serde_json::to_string(value) {
424        let stdout = std::io::stdout();
425        let mut lock = stdout.lock();
426        let _ = writeln!(lock, "{json}");
427        let _ = lock.flush();
428    }
429}
430
431/// Collects files matching the pattern (reuses ingest logic).
432fn collect_matching_files(
433    dir: &Path,
434    pattern: &str,
435    recursive: bool,
436    max_files: usize,
437) -> Result<Vec<PathBuf>, AppError> {
438    let mut files = Vec::new();
439    super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
440    files.sort();
441
442    if files.len() > max_files {
443        return Err(AppError::Validation(format!(
444            "found {} files, exceeds --max-files cap of {}",
445            files.len(),
446            max_files
447        )));
448    }
449
450    Ok(files)
451}
452
453/// Opens or creates the queue database for tracking ingest progress.
454fn open_queue_db(path: &str) -> Result<Connection, AppError> {
455    let conn = Connection::open(path)?;
456
457    conn.execute_batch(
458        "CREATE TABLE IF NOT EXISTS queue (
459            id          INTEGER PRIMARY KEY AUTOINCREMENT,
460            file_path   TEXT NOT NULL UNIQUE,
461            name        TEXT,
462            status      TEXT NOT NULL DEFAULT 'pending',
463            memory_id   INTEGER,
464            entities    INTEGER DEFAULT 0,
465            rels        INTEGER DEFAULT 0,
466            error       TEXT,
467            cost_usd    REAL DEFAULT 0.0,
468            attempt     INTEGER DEFAULT 0,
469            elapsed_ms  INTEGER,
470            created_at  TEXT DEFAULT (datetime('now')),
471            done_at     TEXT
472        );
473        CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
474    )?;
475
476    Ok(conn)
477}
478
479/// Main entry point for `ingest --mode claude-code`.
480pub fn run_claude_ingest(args: &IngestArgs) -> Result<(), AppError> {
481    let started = Instant::now();
482
483    if !args.dir.exists() {
484        return Err(AppError::Validation(format!(
485            "directory not found: {}",
486            args.dir.display()
487        )));
488    }
489
490    // Stage 1: Validate
491    let claude_binary = find_claude_binary(args.claude_binary.as_deref())?;
492    let version = validate_claude_version(&claude_binary)?;
493    tracing::info!(
494        target: "ingest",
495        binary = %claude_binary.display(),
496        version = %version,
497        "Claude Code binary validated"
498    );
499
500    emit_json(&PhaseEvent {
501        phase: "validate",
502        claude_path: claude_binary.to_str(),
503        version: Some(&version),
504        dir: None,
505        files_total: None,
506        files_new: None,
507        files_existing: None,
508    });
509
510    // Stage 2: Scan
511    let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
512
513    let queue_conn = open_queue_db(&args.queue_db)?;
514
515    if args.resume {
516        let reset = queue_conn
517            .execute(
518                "UPDATE queue SET status='pending' WHERE status='processing'",
519                [],
520            )
521            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
522        if reset > 0 {
523            tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
524        }
525    }
526
527    if args.retry_failed {
528        let count = queue_conn
529            .execute(
530                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
531                [],
532            )
533            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
534        tracing::info!(target: "ingest", count, "retrying failed files");
535    }
536
537    if !args.resume && !args.retry_failed {
538        queue_conn
539            .execute("DELETE FROM queue", [])
540            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
541    }
542
543    let mut new_count = 0usize;
544    let mut existing_count = 0usize;
545
546    if !args.retry_failed {
547        for file in &files {
548            let file_str = file.to_string_lossy().to_string();
549            let inserted = queue_conn
550                .execute(
551                    "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
552                    rusqlite::params![file_str],
553                )
554                .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
555            if inserted > 0 {
556                new_count += 1;
557            } else {
558                existing_count += 1;
559            }
560        }
561    }
562
563    emit_json(&PhaseEvent {
564        phase: "scan",
565        claude_path: None,
566        version: None,
567        dir: args.dir.to_str(),
568        files_total: Some(files.len()),
569        files_new: Some(new_count),
570        files_existing: Some(existing_count),
571    });
572
573    if args.dry_run {
574        for (idx, file) in files.iter().enumerate() {
575            let (name, _truncated, _orig) =
576                super::ingest::derive_kebab_name(file, args.max_name_length);
577            emit_json(&FileEvent {
578                file: &file.to_string_lossy(),
579                name: &name,
580                status: "preview",
581                memory_id: None,
582                entities: None,
583                rels: None,
584                cost_usd: None,
585                elapsed_ms: None,
586                error: None,
587                index: idx,
588                total: files.len(),
589            });
590        }
591        emit_json(&Summary {
592            summary: true,
593            files_total: files.len(),
594            completed: 0,
595            failed: 0,
596            skipped: 0,
597            entities_total: 0,
598            rels_total: 0,
599            cost_usd: 0.0,
600            elapsed_ms: started.elapsed().as_millis() as u64,
601        });
602        if !args.keep_queue {
603            let _ = std::fs::remove_file(&args.queue_db);
604        }
605        return Ok(());
606    }
607
608    // Stage 3: Process
609    let paths = AppPaths::resolve(args.db.as_deref())?;
610    ensure_db_ready(&paths)?;
611    let conn = open_rw(&paths.db)?;
612    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
613    let memory_type_str = args.r#type.as_str().to_string();
614
615    let mut completed = 0usize;
616    let mut failed = 0usize;
617    let skipped = 0usize;
618    let mut entities_total = 0usize;
619    let mut rels_total = 0usize;
620    let mut cost_total = 0.0f64;
621    let total = files.len();
622
623    let mut backoff_secs = args.rate_limit_wait;
624
625    loop {
626        let pending: Option<(i64, String)> = queue_conn
627            .query_row(
628                "UPDATE queue SET status='processing', attempt=attempt+1 \
629                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
630                 RETURNING id, file_path",
631                [],
632                |row| Ok((row.get(0)?, row.get(1)?)),
633            )
634            .ok();
635
636        let (queue_id, file_path) = match pending {
637            Some(p) => p,
638            None => break,
639        };
640
641        let file_started = Instant::now();
642        let file_content = match std::fs::read(&file_path) {
643            Ok(c) => c,
644            Err(e) => {
645                let err_msg = format!("IO error: {e}");
646                let _ = queue_conn.execute(
647                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
648                    rusqlite::params![err_msg, queue_id],
649                );
650                let current_index = completed + failed + skipped;
651                failed += 1;
652                emit_json(&FileEvent {
653                    file: &file_path,
654                    name: "",
655                    status: "failed",
656                    memory_id: None,
657                    entities: None,
658                    rels: None,
659                    cost_usd: None,
660                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
661                    error: Some(&err_msg),
662                    index: current_index,
663                    total,
664                });
665                if args.fail_fast {
666                    break;
667                }
668                continue;
669            }
670        };
671
672        // B07: retry once on cold-start failure (Claude Code Issue #23265)
673        let max_extract_attempts: u32 = 2;
674        let mut extraction_result: Option<(ExtractionResult, f64)> = None;
675        let mut last_extract_err: Option<String> = None;
676
677        for attempt in 1..=max_extract_attempts {
678            match extract_with_claude(
679                &claude_binary,
680                &file_content,
681                args.claude_model.as_deref(),
682                args.claude_timeout,
683            ) {
684                Ok(result) => {
685                    extraction_result = Some(result);
686                    break;
687                }
688                Err(ref e) if format!("{e}").contains("RATE_LIMITED") => {
689                    last_extract_err = Some(format!("{e}"));
690                    break;
691                }
692                Err(e) => {
693                    let msg = format!("{e}");
694                    if attempt < max_extract_attempts {
695                        tracing::warn!(target: "ingest", attempt, error = %msg, "extraction failed, retrying (cold-start workaround)");
696                        std::thread::sleep(std::time::Duration::from_secs(2));
697                    }
698                    last_extract_err = Some(msg);
699                }
700            }
701        }
702
703        if let Some((extraction, cost)) = extraction_result {
704            backoff_secs = args.rate_limit_wait;
705
706            let name = &extraction.name;
707            let ent_count = extraction.entities.len();
708            let rel_count = extraction.relationships.len();
709
710            let new_entities: Vec<NewEntity> = extraction
711                .entities
712                .iter()
713                .filter_map(|e| match e.entity_type.parse::<EntityType>() {
714                    Ok(et) => Some(NewEntity {
715                        name: e.name.clone(),
716                        entity_type: et,
717                        description: None,
718                    }),
719                    Err(_) => {
720                        tracing::warn!(
721                            target: "ingest",
722                            entity = %e.name,
723                            entity_type = %e.entity_type,
724                            "entity type not recognized, skipping"
725                        );
726                        None
727                    }
728                })
729                .collect();
730
731            let new_relationships: Vec<NewRelationship> = extraction
732                .relationships
733                .iter()
734                .map(|r| NewRelationship {
735                    source: r.source.clone(),
736                    target: r.target.clone(),
737                    relation: r.relation.clone(),
738                    strength: r.strength,
739                    description: None,
740                })
741                .collect();
742
743            let body_str = String::from_utf8_lossy(&file_content);
744            let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
745            let new_memory = NewMemory {
746                name: name.clone(),
747                namespace: namespace.clone(),
748                memory_type: memory_type_str.clone(),
749                description: extraction.description.clone(),
750                body: body_str.to_string(),
751                body_hash,
752                session_id: None,
753                source: "agent".to_string(),
754                metadata: serde_json::Value::Object(serde_json::Map::new()),
755            };
756
757            // B06: deduplication — update existing memory instead of failing on UNIQUE
758            let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
759                Some((existing_id, is_deleted)) => {
760                    if is_deleted {
761                        memories::clear_deleted_at(&conn, existing_id)?;
762                    }
763                    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
764                        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
765                        rusqlite::params![existing_id],
766                        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
767                    )?;
768                    memories::update(&conn, existing_id, &new_memory, None)?;
769                    memories::sync_fts_after_update(
770                        &conn,
771                        existing_id,
772                        &old_name,
773                        &old_desc,
774                        &old_body,
775                        &new_memory.name,
776                        &new_memory.description,
777                        &new_memory.body,
778                    )?;
779                    tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
780                    existing_id
781                }
782                None => match memories::insert(&conn, &new_memory) {
783                    Ok(id) => id,
784                    Err(e) => {
785                        let err_msg = format!("{e}");
786                        let _ = queue_conn.execute(
787                                "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
788                                rusqlite::params![err_msg, queue_id],
789                            );
790                        let current_index = completed + failed + skipped;
791                        failed += 1;
792                        emit_json(&FileEvent {
793                            file: &file_path,
794                            name,
795                            status: "failed",
796                            memory_id: None,
797                            entities: None,
798                            rels: None,
799                            cost_usd: Some(cost),
800                            elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
801                            error: Some(&err_msg),
802                            index: current_index,
803                            total,
804                        });
805                        cost_total += cost;
806                        if args.fail_fast {
807                            break;
808                        }
809                        continue;
810                    }
811                },
812            };
813
814            for ent in &new_entities {
815                if let Ok(eid) = entities::upsert_entity(&conn, &namespace, ent) {
816                    let _ = entities::link_memory_entity(&conn, memory_id, eid);
817                }
818            }
819            for rel in &new_relationships {
820                crate::parsers::warn_if_non_canonical(&rel.relation);
821                let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
822                let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
823                if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
824                    let _ = conn.execute(
825                        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
826                        rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
827                    );
828                }
829            }
830
831            let _ = queue_conn.execute(
832                "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, cost_usd=?5, elapsed_ms=?6, done_at=datetime('now') WHERE id=?7",
833                rusqlite::params![
834                    name,
835                    memory_id,
836                    ent_count,
837                    rel_count,
838                    cost,
839                    file_started.elapsed().as_millis() as i64,
840                    queue_id
841                ],
842            );
843
844            let current_index = completed + failed + skipped;
845            completed += 1;
846            entities_total += ent_count;
847            rels_total += rel_count;
848            cost_total += cost;
849
850            emit_json(&FileEvent {
851                file: &file_path,
852                name,
853                status: "done",
854                memory_id: Some(memory_id),
855                entities: Some(ent_count),
856                rels: Some(rel_count),
857                cost_usd: Some(cost),
858                elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
859                error: None,
860                index: current_index,
861                total,
862            });
863        } else if let Some(ref err_str) = last_extract_err {
864            if err_str.contains("RATE_LIMITED") {
865                tracing::warn!(
866                    target: "ingest",
867                    wait_seconds = backoff_secs,
868                    "rate limited, waiting before retry"
869                );
870                let _ = queue_conn.execute(
871                    "UPDATE queue SET status='pending' WHERE id=?1",
872                    rusqlite::params![queue_id],
873                );
874                std::thread::sleep(std::time::Duration::from_secs(backoff_secs));
875                backoff_secs = (backoff_secs * 2).min(900);
876                continue;
877            } else {
878                let _ = queue_conn.execute(
879                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
880                    rusqlite::params![err_str, queue_id],
881                );
882                let current_index = completed + failed + skipped;
883                failed += 1;
884                emit_json(&FileEvent {
885                    file: &file_path,
886                    name: "",
887                    status: "failed",
888                    memory_id: None,
889                    entities: None,
890                    rels: None,
891                    cost_usd: None,
892                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
893                    error: Some(err_str),
894                    index: current_index,
895                    total,
896                });
897                if args.fail_fast {
898                    break;
899                }
900            }
901        }
902
903        if let Some(budget) = args.max_cost_usd {
904            if cost_total >= budget {
905                tracing::warn!(
906                    target: "ingest",
907                    spent = cost_total,
908                    budget = budget,
909                    "budget exceeded, stopping"
910                );
911                break;
912            }
913        }
914    }
915
916    // Stage 4: Summary
917    emit_json(&Summary {
918        summary: true,
919        files_total: total,
920        completed,
921        failed,
922        skipped,
923        entities_total,
924        rels_total,
925        cost_usd: cost_total,
926        elapsed_ms: started.elapsed().as_millis() as u64,
927    });
928
929    if !args.keep_queue && failed == 0 {
930        let _ = std::fs::remove_file(&args.queue_db);
931    }
932
933    Ok(())
934}
935
936#[cfg(test)]
937mod tests {
938    use super::*;
939
940    #[test]
941    fn test_extraction_schema_valid_json() {
942        let _: serde_json::Value =
943            serde_json::from_str(EXTRACTION_SCHEMA).expect("schema must be valid JSON");
944    }
945
946    #[test]
947    fn test_parse_claude_output_valid() {
948        let output = r#"[
949            {"type":"system","subtype":"init"},
950            {"type":"assistant"},
951            {"type":"result","is_error":false,"total_cost_usd":0.02,"structured_output":{"name":"test-doc","description":"A test document","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-doc","relation":"applies-to","strength":0.8}]}}
952        ]"#;
953        let (result, cost) = parse_claude_output(output).expect("parse must succeed");
954        assert_eq!(result.name, "test-doc");
955        assert_eq!(result.entities.len(), 1);
956        assert_eq!(result.relationships.len(), 1);
957        assert!((cost - 0.02).abs() < f64::EPSILON);
958    }
959
960    #[test]
961    fn test_parse_claude_output_error() {
962        let output = r#"[
963            {"type":"system","subtype":"init"},
964            {"type":"result","is_error":true,"error":"authentication failed"}
965        ]"#;
966        let err = parse_claude_output(output).unwrap_err();
967        assert!(format!("{err}").contains("authentication failed"));
968    }
969
970    #[test]
971    fn test_parse_claude_output_rate_limit() {
972        let output = r#"[
973            {"type":"system","subtype":"init"},
974            {"type":"result","is_error":true,"error":"rate_limit exceeded"}
975        ]"#;
976        let err = parse_claude_output(output).unwrap_err();
977        assert!(format!("{err}").contains("RATE_LIMITED"));
978    }
979
980    #[test]
981    fn test_parse_claude_output_malformed() {
982        let output = "not json at all";
983        assert!(parse_claude_output(output).is_err());
984    }
985
986    #[test]
987    fn test_find_claude_binary_not_found() {
988        let original_path = std::env::var_os("PATH");
989        std::env::set_var("PATH", "/nonexistent");
990        std::env::remove_var("SQLITE_GRAPHRAG_CLAUDE_BINARY");
991        let result = find_claude_binary(None);
992        if let Some(p) = original_path {
993            std::env::set_var("PATH", p);
994        }
995        assert!(result.is_err());
996    }
997
998    #[test]
999    fn test_parse_claude_output_result_fallback() {
1000        let output = r#"[
1001            {"type":"system","subtype":"init"},
1002            {"type":"result","is_error":false,"total_cost_usd":0.01,"structured_output":null,"result":"{\"name\":\"test-fallback\",\"description\":\"A fallback test\",\"entities\":[{\"name\":\"fb-entity\",\"entity_type\":\"concept\"}],\"relationships\":[]}"}
1003        ]"#;
1004        let (result, cost) = parse_claude_output(output).expect("result fallback must work");
1005        assert_eq!(result.name, "test-fallback");
1006        assert_eq!(result.entities.len(), 1);
1007        assert!(result.relationships.is_empty());
1008        assert!((cost - 0.01).abs() < f64::EPSILON);
1009    }
1010
1011    #[test]
1012    fn test_parse_claude_output_error_with_result_field() {
1013        let output = r#"[
1014            {"type":"system","subtype":"init"},
1015            {"type":"result","is_error":true,"result":"Not logged in · Please run /login"}
1016        ]"#;
1017        let err = parse_claude_output(output).unwrap_err();
1018        let msg = format!("{err}");
1019        assert!(
1020            msg.contains("Not logged in"),
1021            "expected 'Not logged in' in: {msg}"
1022        );
1023    }
1024
1025    #[test]
1026    fn test_extraction_schema_entity_types_match_enum() {
1027        let schema: serde_json::Value = serde_json::from_str(EXTRACTION_SCHEMA).unwrap();
1028        let types = schema["properties"]["entities"]["items"]["properties"]["entity_type"]["enum"]
1029            .as_array()
1030            .expect("schema must have entity_type enum");
1031        for t in types {
1032            let s = t.as_str().unwrap();
1033            assert!(
1034                s.parse::<EntityType>().is_ok(),
1035                "schema entity_type '{s}' not in EntityType enum"
1036            );
1037        }
1038    }
1039}