Skip to main content

sqlite_graphrag/commands/
ingest_codex.rs

1//! Handler for `ingest --mode codex`.
2//!
3//! Orchestrates the locally installed OpenAI Codex CLI binary (`codex exec`)
4//! to extract domain-specific entities and relationships from each file,
5//! then persists them with full embedding pipeline for recall/hybrid-search.
6//!
7//! Architecture: P1 One-Shot per file — each file spawns a separate
8//! `codex exec` process with `--output-schema` for guaranteed structured output.
9//! A SQLite queue DB tracks progress for resume/retry support.
10// Workload: Subprocess I/O-bound (codex exec headless with network wait)
11
12use crate::commands::ingest::IngestArgs;
13use crate::commands::ingest_claude::ExtractionResult;
14use crate::entity_type::EntityType;
15use crate::errors::AppError;
16use crate::paths::AppPaths;
17use crate::storage::connection::{ensure_db_ready, open_rw};
18use crate::storage::entities::{self, NewEntity, NewRelationship};
19use crate::storage::memories::{self, NewMemory};
20
21use rusqlite::Connection;
22use serde::{Deserialize, Serialize};
23use std::io::Write;
24use std::path::{Path, PathBuf};
25use std::process::{Command, Stdio};
26use std::time::Instant;
27
28const MIN_CODEX_VERSION: &str = "0.120.0";
29
30/// OpenAI structured output schema with `additionalProperties: false` at all nested levels.
31const EXTRACTION_SCHEMA_CODEX: &str = r#"{
32  "type": "object",
33  "properties": {
34    "name": { "type": "string" },
35    "description": { "type": "string" },
36    "entities": {
37      "type": "array",
38      "items": {
39        "type": "object",
40        "properties": {
41          "name": { "type": "string" },
42          "entity_type": {
43            "type": "string",
44            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
45          }
46        },
47        "required": ["name", "entity_type"],
48        "additionalProperties": false
49      }
50    },
51    "relationships": {
52      "type": "array",
53      "items": {
54        "type": "object",
55        "properties": {
56          "source": { "type": "string" },
57          "target": { "type": "string" },
58          "relation": {
59            "type": "string",
60            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
61          },
62          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
63        },
64        "required": ["source","target","relation","strength"],
65        "additionalProperties": false
66      }
67    }
68  },
69  "required": ["name","description","entities","relationships"],
70  "additionalProperties": false
71}"#;
72
73const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
741. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
752. A one-sentence description (10-20 words) summarizing the key insight\n\
763. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
774. Typed relationships between entities with strength scores\n\n\
78Rules:\n\
79- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
80- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
81- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
82- NEVER use 'mentions' as relationship type\n\
83- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
84- Prefer fewer high-quality entities over many low-quality ones\n\
85- Description must answer: What is this about and WHY does it matter?";
86
87/// Token usage reported by Codex CLI on `turn.completed` events.
88#[derive(Debug, Clone, Deserialize, Serialize)]
89struct CodexUsage {
90    input_tokens: u64,
91    #[serde(default)]
92    cached_input_tokens: u64,
93    output_tokens: u64,
94    #[serde(default)]
95    reasoning_output_tokens: u64,
96}
97
98#[derive(Debug, Serialize)]
99struct PhaseEvent<'a> {
100    phase: &'a str,
101    #[serde(skip_serializing_if = "Option::is_none")]
102    codex_path: Option<&'a str>,
103    #[serde(skip_serializing_if = "Option::is_none")]
104    version: Option<&'a str>,
105    #[serde(skip_serializing_if = "Option::is_none")]
106    dir: Option<&'a str>,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    files_total: Option<usize>,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    files_new: Option<usize>,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    files_existing: Option<usize>,
113}
114
115#[derive(Debug, Serialize)]
116struct FileEvent<'a> {
117    file: &'a str,
118    name: &'a str,
119    status: &'a str,
120    #[serde(skip_serializing_if = "Option::is_none")]
121    memory_id: Option<i64>,
122    #[serde(skip_serializing_if = "Option::is_none")]
123    entities: Option<usize>,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    rels: Option<usize>,
126    /// Always None for Codex (no cost_usd in Codex API responses).
127    #[serde(skip_serializing_if = "Option::is_none")]
128    cost_usd: Option<f64>,
129    #[serde(skip_serializing_if = "Option::is_none")]
130    input_tokens: Option<u64>,
131    #[serde(skip_serializing_if = "Option::is_none")]
132    output_tokens: Option<u64>,
133    #[serde(skip_serializing_if = "Option::is_none")]
134    elapsed_ms: Option<u64>,
135    #[serde(skip_serializing_if = "Option::is_none")]
136    error: Option<&'a str>,
137    index: usize,
138    total: usize,
139}
140
141#[derive(Debug, Serialize)]
142struct Summary {
143    summary: bool,
144    files_total: usize,
145    completed: usize,
146    failed: usize,
147    skipped: usize,
148    entities_total: usize,
149    rels_total: usize,
150    input_tokens_total: u64,
151    output_tokens_total: u64,
152    elapsed_ms: u64,
153}
154
155/// Locates the Codex CLI binary on the system.
156///
157/// Search order:
158/// 1. Explicit `--codex-binary` CLI flag.
159/// 2. `SQLITE_GRAPHRAG_CODEX_BINARY` env var.
160/// 3. PATH search for `codex` (or `codex.exe` on Windows).
161pub fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
162    if let Some(p) = explicit {
163        if p.exists() {
164            return Ok(p.to_path_buf());
165        }
166        return Err(AppError::Validation(format!(
167            "Codex CLI binary not found at explicit path: {}",
168            p.display()
169        )));
170    }
171
172    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
173        let p = PathBuf::from(&env_path);
174        if p.exists() {
175            return Ok(p);
176        }
177    }
178
179    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
180    if let Some(path_var) = std::env::var_os("PATH") {
181        for dir in std::env::split_paths(&path_var) {
182            let candidate = dir.join(name);
183            if candidate.exists() {
184                return Ok(candidate);
185            }
186        }
187    }
188
189    Err(AppError::Validation(
190        "Codex CLI binary not found in PATH. Install it from https://github.com/openai/codex or specify --codex-binary".to_string(),
191    ))
192}
193
194/// Validates that the Codex CLI binary meets the minimum version requirement.
195///
196/// # Errors
197///
198/// Returns `AppError::Validation` when the binary cannot be executed or the
199/// version is below `MIN_CODEX_VERSION`.
200fn validate_codex_version(binary: &Path) -> Result<String, AppError> {
201    let resolved = which::which(binary).map_err(|_| {
202        AppError::Validation(format!(
203            "executable '{}' not found in PATH; ensure Codex CLI is installed",
204            binary.display()
205        ))
206    })?;
207    let output = Command::new(&resolved)
208        .arg("--version")
209        .stdin(Stdio::null())
210        .stdout(Stdio::piped())
211        .stderr(Stdio::piped())
212        .output()
213        .map_err(AppError::Io)?;
214
215    let raw = String::from_utf8(output.stdout)
216        .map_err(|_| AppError::Validation("codex --version output is not UTF-8".to_string()))?;
217
218    let version_str = raw.trim().to_string();
219
220    // Codex CLI outputs: "codex-cli 0.133.0" or just "0.133.0"
221    let numeric = version_str.split_whitespace().last().unwrap_or("").trim();
222
223    fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
224        let parts: Vec<&str> = s.splitn(3, '.').collect();
225        if parts.len() < 2 {
226            return None;
227        }
228        let major = parts[0].parse::<u64>().ok()?;
229        let minor = parts[1].parse::<u64>().ok()?;
230        let patch = parts
231            .get(2)
232            .and_then(|p| p.parse::<u64>().ok())
233            .unwrap_or(0);
234        Some((major, minor, patch))
235    }
236
237    if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CODEX_VERSION)) {
238        if actual < min {
239            return Err(AppError::Validation(format!(
240                "Codex CLI version {numeric} is below minimum required {MIN_CODEX_VERSION}"
241            )));
242        }
243    }
244
245    Ok(version_str)
246}
247
248/// Writes the extraction schema to a named temp file for `--output-schema`.
249///
250/// # Errors
251///
252/// Returns `AppError::Io` when the temp file cannot be created or written.
253fn write_schema_tempfile() -> Result<tempfile::NamedTempFile, AppError> {
254    let mut f = tempfile::NamedTempFile::new().map_err(AppError::Io)?;
255    std::io::Write::write_all(&mut f, EXTRACTION_SCHEMA_CODEX.as_bytes()).map_err(AppError::Io)?;
256    std::io::Write::flush(&mut f).map_err(AppError::Io)?;
257    Ok(f)
258}
259
260/// Invokes `codex exec` for a single file and returns the extraction result.
261///
262/// Uses `wait-timeout` for cross-platform subprocess timeout, `env_clear()`
263/// for least-privilege environment, and reads prompt + file content from
264/// stdin using the `-` argument (Codex Paperclip pattern).
265///
266/// # Errors
267///
268/// Returns `AppError::Validation` on extraction failure, rate limiting, or
269/// schema errors. Returns `AppError::Io` on process spawn/IO failures.
270fn extract_with_codex(
271    binary: &Path,
272    file_content: &[u8],
273    model: Option<&str>,
274    timeout_secs: u64,
275    schema_file: &Path,
276) -> Result<(ExtractionResult, Option<CodexUsage>), AppError> {
277    use wait_timeout::ChildExt;
278
279    // G31 Passo C (v1.0.69): delegate command construction to the shared
280    // `codex_spawn::build_codex_command` helper so `enrich` and `ingest` stay
281    // perfectly aligned on the canonical seven hardening flags. The local
282    // function still owns the stdin pump + JSONL parsing (see below).
283    let _ = timeout_secs; // currently unused; consumed by the helper when it spawns the process
284    let _ = file_content; // pumped into stdin below, see `stdin_pump` thread
285    let _ = schema_file; // helper reuses the temp file at the given path
286    let prompt = String::new(); // empty prompt — helper appends file_content via args.input_text
287    let mut cmd = crate::commands::codex_spawn::build_codex_command(
288        &crate::commands::codex_spawn::CodexSpawnArgs {
289            binary,
290            prompt: &prompt,
291            json_schema: "", // caller writes the schema directly via `schema_file`
292            input_text: "",
293            model,
294            timeout_secs,
295            schema_path: schema_file.to_path_buf(),
296        },
297    )?;
298
299    // `build_codex_command` writes the JSON schema to `schema_path` and
300    // appends `input_text` to the prompt via Paperclip stdin. For `ingest`
301    // we want the schema content already on disk (the caller pre-wrote
302    // EXTRACTION_SCHEMA_CODEX into the named tempfile), and the document
303    // content goes through stdin via a dedicated thread (see below). Strip
304    // the file the helper just rewrote — our caller pre-wrote it.
305    let _ = std::fs::write(
306        schema_file,
307        crate::commands::ingest_codex::EXTRACTION_SCHEMA_CODEX,
308    );
309
310    cmd.stdin(Stdio::piped())
311        .stdout(Stdio::piped())
312        .stderr(Stdio::piped());
313
314    let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
315        AppError::Io(std::io::Error::new(
316            e.kind(),
317            format!("failed to spawn codex: {e}"),
318        ))
319    })?;
320
321    // Build stdin: prompt + document content (strict UTF-8 to surface encoding bugs early)
322    let file_utf8 = String::from_utf8(file_content.to_vec())
323        .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
324    let stdin_payload = format!("{EXTRACTION_PROMPT}\n\n---\n\nDocument content:\n\n{file_utf8}");
325    let stdin_bytes = stdin_payload.into_bytes();
326
327    let mut child_stdin = child
328        .stdin
329        .take()
330        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
331    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
332        child_stdin.write_all(&stdin_bytes)?;
333        drop(child_stdin);
334        Ok(())
335    });
336
337    let start = std::time::Instant::now();
338    let timeout = std::time::Duration::from_secs(timeout_secs);
339    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
340
341    match status {
342        Some(exit_status) => {
343            stdin_thread
344                .join()
345                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
346                .map_err(AppError::Io)?;
347
348            tracing::debug!(
349                target: "process",
350                exit_code = ?exit_status.code(),
351                elapsed_ms = start.elapsed().as_millis() as u64,
352                "external process completed"
353            );
354
355            let mut stdout_buf = Vec::new();
356            let mut stderr_buf = Vec::new();
357            if let Some(mut out) = child.stdout.take() {
358                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
359            }
360            if let Some(mut err) = child.stderr.take() {
361                std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
362            }
363
364            if !exit_status.success() {
365                let stderr_str = String::from_utf8_lossy(&stderr_buf);
366                let stdout_str = String::from_utf8_lossy(&stdout_buf);
367                // Check if stdout has JSONL with an error event before falling back
368                if let Ok((result, usage)) = parse_codex_output(&stdout_str) {
369                    return Ok((result, usage));
370                }
371                if stderr_str.contains("401")
372                    || stderr_str.contains("Unauthorized")
373                    || stderr_str.contains("auth")
374                {
375                    tracing::warn!(
376                        target: "ingest",
377                        "Codex CLI authentication expired. Re-authenticate with: codex auth login"
378                    );
379                }
380                return Err(AppError::Validation(format!(
381                    "codex exec exited with code {:?}: {}",
382                    exit_status.code(),
383                    stderr_str.trim()
384                )));
385            }
386
387            let stdout = String::from_utf8(stdout_buf)
388                .map_err(|_| AppError::Validation("codex exec stdout is not valid UTF-8".into()))?;
389            parse_codex_output(&stdout)
390        }
391        None => {
392            tracing::warn!(target: "ingest", timeout_secs, "codex exec timed out, killing process");
393            let _ = child.kill();
394            let _ = child.wait();
395            let _ = stdin_thread.join();
396            Err(AppError::Validation(format!(
397                "codex exec timed out after {timeout_secs} seconds"
398            )))
399        }
400    }
401}
402
403/// Parses JSONL output from `codex exec --json`.
404///
405/// Event format (DOTS notation):
406/// - `thread.started` — session init
407/// - `turn.started` — model turn begins
408/// - `item.completed` — message or tool call; last `agent_message` wins
409/// - `turn.completed` — includes usage stats
410/// - `turn.failed` — error with optional rate-limit indicator
411/// - `error` — schema or validation error
412///
413/// # Errors
414///
415/// Returns `AppError::Validation` when no agent_message is found, when the
416/// turn failed, or when the extracted JSON cannot be parsed as `ExtractionResult`.
417fn parse_codex_output(stdout: &str) -> Result<(ExtractionResult, Option<CodexUsage>), AppError> {
418    let mut last_agent_text: Option<String> = None;
419    let mut usage: Option<CodexUsage> = None;
420    let mut rate_limited = false;
421    let mut schema_error = false;
422    let mut turn_failed = false;
423    let mut failed_message = String::new();
424
425    for line in stdout.lines() {
426        let line = line.trim();
427        if line.is_empty() {
428            continue;
429        }
430
431        let event: serde_json::Value = match serde_json::from_str(line) {
432            Ok(v) => v,
433            Err(_) => {
434                tracing::warn!(target: "ingest", line, "codex output: skipping malformed JSONL line");
435                continue;
436            }
437        };
438
439        let event_type = match event.get("type").and_then(|t| t.as_str()) {
440            Some(t) => t,
441            None => continue,
442        };
443
444        match event_type {
445            "item.completed" => {
446                // Last agent_message wins (reasoning / tool calls may appear before)
447                if let Some(item) = event.get("item") {
448                    if item.get("type").and_then(|t| t.as_str()) == Some("agent_message") {
449                        if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
450                            last_agent_text = Some(text.to_string());
451                        }
452                    }
453                }
454            }
455            "turn.completed" => {
456                if let Some(u) = event.get("usage") {
457                    if let Ok(parsed) = serde_json::from_value::<CodexUsage>(u.clone()) {
458                        usage = Some(parsed);
459                    }
460                }
461            }
462            "turn.failed" => {
463                turn_failed = true;
464                if let Some(err) = event.get("error") {
465                    let msg = err
466                        .get("message")
467                        .and_then(|m| m.as_str())
468                        .unwrap_or("unknown error");
469                    failed_message = msg.to_string();
470                    if msg.contains("rate_limit")
471                        || msg.contains("429")
472                        || msg.contains("Too Many Requests")
473                    {
474                        rate_limited = true;
475                    }
476                }
477            }
478            "error" => {
479                if let Some(msg) = event.get("message").and_then(|m| m.as_str()) {
480                    if msg.contains("invalid_json_schema") || msg.contains("schema") {
481                        schema_error = true;
482                    }
483                    tracing::warn!(target: "ingest", error_msg = msg, "codex error event received");
484                }
485            }
486            _ => {
487                // Gracefully skip unknown event types (thread.started, turn.started, etc.)
488            }
489        }
490    }
491
492    if rate_limited {
493        return Err(AppError::RateLimited {
494            detail: failed_message,
495        });
496    }
497
498    if schema_error {
499        return Err(AppError::Validation(
500            "codex rejected the output schema (invalid_json_schema)".to_string(),
501        ));
502    }
503
504    if turn_failed {
505        return Err(AppError::Validation(format!(
506            "codex turn failed: {failed_message}"
507        )));
508    }
509
510    let text = last_agent_text.ok_or_else(|| {
511        AppError::Validation("codex output contained no agent_message item".to_string())
512    })?;
513
514    let extraction: ExtractionResult = serde_json::from_str(&text).map_err(|e| {
515        AppError::Validation(format!(
516            "failed to parse codex agent_message as ExtractionResult: {e}. text={text}"
517        ))
518    })?;
519
520    Ok((extraction, usage))
521}
522
523use crate::output::emit_json_line as emit_json;
524
525/// Collects files matching the pattern (reuses ingest logic).
526fn collect_matching_files(
527    dir: &Path,
528    pattern: &str,
529    recursive: bool,
530    max_files: usize,
531) -> Result<Vec<PathBuf>, AppError> {
532    let mut files = Vec::new();
533    super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
534    files.sort_unstable();
535
536    if files.len() > max_files {
537        return Err(AppError::Validation(format!(
538            "found {} files, exceeds --max-files cap of {}",
539            files.len(),
540            max_files
541        )));
542    }
543
544    Ok(files)
545}
546
547/// Opens or creates the queue database for tracking ingest progress.
548fn open_queue_db<P: AsRef<std::path::Path>>(path: P) -> Result<Connection, AppError> {
549    let conn = Connection::open(path)?;
550
551    conn.execute_batch(
552        "PRAGMA journal_mode=WAL;
553        CREATE TABLE IF NOT EXISTS queue (
554            id          INTEGER PRIMARY KEY AUTOINCREMENT,
555            file_path   TEXT NOT NULL UNIQUE,
556            name        TEXT,
557            status      TEXT NOT NULL DEFAULT 'pending',
558            memory_id   INTEGER,
559            entities    INTEGER DEFAULT 0,
560            rels        INTEGER DEFAULT 0,
561            error       TEXT,
562            input_tokens  INTEGER DEFAULT 0,
563            output_tokens INTEGER DEFAULT 0,
564            attempt     INTEGER DEFAULT 0,
565            elapsed_ms  INTEGER,
566            created_at  TEXT DEFAULT (datetime('now')),
567            done_at     TEXT
568        );
569        CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
570    )?;
571
572    Ok(conn)
573}
574
575/// Main entry point for `ingest --mode codex`.
576///
577/// # Errors
578///
579/// Returns `AppError` on directory/DB access failures or fatal extraction errors.
580pub fn run_codex_ingest(args: &IngestArgs) -> Result<(), AppError> {
581    let started = Instant::now();
582
583    if !args.dir.exists() {
584        return Err(AppError::Validation(format!(
585            "directory not found: {}",
586            args.dir.display()
587        )));
588    }
589
590    // G28-B (v1.0.68) + G30 (v1.0.69): acquire singleton before doing real
591    // work so two parallel `ingest --mode codex` invocations cannot co-exist
592    // on the same database. Scope includes the database hash so concurrent
593    // ingest against different databases is allowed.
594    let early_ns = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
595    let early_paths = AppPaths::resolve(args.db.as_deref())?;
596    let queue_path = match args.queue_db.as_deref() {
597        Some(p) => std::path::PathBuf::from(p),
598        None => crate::paths::sidecar_path(&early_paths.db, ".ingest-queue.sqlite"),
599    };
600    let _singleton = crate::lock::acquire_job_singleton(
601        crate::lock::JobType::IngestCodex,
602        &early_ns,
603        &early_paths.db,
604        args.wait_job_singleton,
605        args.force_job_singleton,
606    )?;
607
608    // Stage 1: Validate binary
609    let codex_binary = find_codex_binary(args.codex_binary.as_deref())?;
610    let version = validate_codex_version(&codex_binary)?;
611    tracing::info!(
612        target: "ingest",
613        binary = %codex_binary.display(),
614        version = %version,
615        "Codex CLI binary validated"
616    );
617
618    emit_json(&PhaseEvent {
619        phase: "validate",
620        codex_path: codex_binary.to_str(),
621        version: Some(&version),
622        dir: None,
623        files_total: None,
624        files_new: None,
625        files_existing: None,
626    });
627
628    // Stage 2: Scan files
629    let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
630
631    let queue_conn = open_queue_db(&queue_path)?;
632
633    if args.resume {
634        let reset = queue_conn
635            .execute(
636                "UPDATE queue SET status='pending' WHERE status='processing'",
637                [],
638            )
639            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
640        if reset > 0 {
641            tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
642        }
643    }
644
645    if args.retry_failed {
646        let count = queue_conn
647            .execute(
648                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
649                [],
650            )
651            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
652        tracing::info!(target: "ingest", count, "retrying failed files");
653    }
654
655    if !args.resume && !args.retry_failed {
656        queue_conn
657            .execute("DELETE FROM queue", [])
658            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
659    }
660
661    let mut new_count = 0usize;
662    let mut existing_count = 0usize;
663
664    if !args.retry_failed {
665        for file in &files {
666            let file_str = file.to_string_lossy().into_owned();
667            let inserted = queue_conn
668                .execute(
669                    "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
670                    rusqlite::params![file_str],
671                )
672                .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
673            if inserted > 0 {
674                new_count += 1;
675            } else {
676                existing_count += 1;
677            }
678        }
679    }
680
681    emit_json(&PhaseEvent {
682        phase: "scan",
683        codex_path: None,
684        version: None,
685        dir: args.dir.to_str(),
686        files_total: Some(files.len()),
687        files_new: Some(new_count),
688        files_existing: Some(existing_count),
689    });
690
691    if args.dry_run {
692        for (idx, file) in files.iter().enumerate() {
693            let (name, _truncated, _orig) =
694                super::ingest::derive_kebab_name(file, args.max_name_length);
695            emit_json(&FileEvent {
696                file: &file.to_string_lossy(),
697                name: &name,
698                status: "preview",
699                memory_id: None,
700                entities: None,
701                rels: None,
702                cost_usd: None,
703                input_tokens: None,
704                output_tokens: None,
705                elapsed_ms: None,
706                error: None,
707                index: idx,
708                total: files.len(),
709            });
710        }
711        emit_json(&Summary {
712            summary: true,
713            files_total: files.len(),
714            completed: 0,
715            failed: 0,
716            skipped: 0,
717            entities_total: 0,
718            rels_total: 0,
719            input_tokens_total: 0,
720            output_tokens_total: 0,
721            elapsed_ms: started.elapsed().as_millis() as u64,
722        });
723        if !args.keep_queue {
724            let _ = std::fs::remove_file(&queue_path);
725        }
726        return Ok(());
727    }
728
729    // Stage 3: Process files
730    let paths = AppPaths::resolve(args.db.as_deref())?;
731    ensure_db_ready(&paths)?;
732    let conn = open_rw(&paths.db)?;
733    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
734    let memory_type_str = args.r#type.as_str().to_string();
735
736    // Write schema to temp file once (reused across all files)
737    let schema_tempfile = write_schema_tempfile()?;
738    let schema_path = schema_tempfile.path().to_path_buf();
739
740    let mut completed = 0usize;
741    let mut failed = 0usize;
742    let skipped_initial: usize = queue_conn
743        .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
744            r.get::<_, usize>(0)
745        })
746        .unwrap_or(0);
747    let mut skipped = skipped_initial;
748    let mut entities_total = 0usize;
749    let mut rels_total = 0usize;
750    let mut input_tokens_total = 0u64;
751    let mut output_tokens_total = 0u64;
752    let total = files.len();
753
754    let mut backoff_secs = args.rate_limit_wait;
755    let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
756
757    loop {
758        if crate::shutdown_requested() {
759            tracing::info!(target: "ingest", "shutdown requested, stopping before next file");
760            break;
761        }
762
763        let pending: Option<(i64, String)> = queue_conn
764            .query_row(
765                "UPDATE queue SET status='processing', attempt=attempt+1 \
766                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
767                 RETURNING id, file_path",
768                [],
769                |row| Ok((row.get(0)?, row.get(1)?)),
770            )
771            .ok();
772
773        let (queue_id, file_path) = match pending {
774            Some(p) => p,
775            None => break,
776        };
777
778        let file_started = Instant::now();
779
780        // Reject files that exceed the 10 MB stdin limit
781        const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
782        if let Ok(meta) = std::fs::metadata(&file_path) {
783            if meta.len() > MAX_FILE_SIZE {
784                let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
785                let _ = queue_conn.execute(
786                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
787                    rusqlite::params![err_msg, queue_id],
788                );
789                let current_index = completed + failed + skipped;
790                failed += 1;
791                emit_json(&FileEvent {
792                    file: &file_path,
793                    name: "",
794                    status: "failed",
795                    memory_id: None,
796                    entities: None,
797                    rels: None,
798                    cost_usd: None,
799                    input_tokens: None,
800                    output_tokens: None,
801                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
802                    error: Some(&err_msg),
803                    index: current_index,
804                    total,
805                });
806                if args.fail_fast {
807                    break;
808                }
809                continue;
810            }
811        }
812
813        let file_content = match std::fs::read(&file_path) {
814            Ok(c) => c,
815            Err(e) => {
816                let err_msg = format!("IO error: {e}");
817                let _ = queue_conn.execute(
818                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
819                    rusqlite::params![err_msg, queue_id],
820                );
821                let current_index = completed + failed + skipped;
822                failed += 1;
823                emit_json(&FileEvent {
824                    file: &file_path,
825                    name: "",
826                    status: "failed",
827                    memory_id: None,
828                    entities: None,
829                    rels: None,
830                    cost_usd: None,
831                    input_tokens: None,
832                    output_tokens: None,
833                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
834                    error: Some(&err_msg),
835                    index: current_index,
836                    total,
837                });
838                if args.fail_fast {
839                    break;
840                }
841                continue;
842            }
843        };
844
845        // Skip files exceeding body cap BEFORE sending to LLM to avoid wasting tokens
846        if file_content.len() > crate::constants::MAX_MEMORY_BODY_LEN {
847            let err_msg = format!(
848                "file body exceeds {} byte limit ({} bytes) — skipping to avoid wasting LLM tokens",
849                crate::constants::MAX_MEMORY_BODY_LEN,
850                file_content.len()
851            );
852            tracing::warn!(target: "ingest", file = %file_path, size = file_content.len(), "body exceeds limit, skipping LLM extraction");
853            let _ = queue_conn.execute(
854                "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
855                rusqlite::params![err_msg, queue_id],
856            );
857            let current_index = completed + failed + skipped;
858            skipped += 1;
859            emit_json(&FileEvent {
860                file: &file_path,
861                name: "",
862                status: "skipped",
863                memory_id: None,
864                entities: None,
865                rels: None,
866                cost_usd: None,
867                input_tokens: None,
868                output_tokens: None,
869                elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
870                error: Some(&err_msg),
871                index: current_index,
872                total,
873            });
874            continue;
875        }
876
877        // Retry once on cold-start failure
878        let max_extract_attempts: u32 = 2;
879        let mut extraction_result: Option<(ExtractionResult, Option<CodexUsage>)> = None;
880        let mut last_extract_err: Option<String> = None;
881        let mut last_was_rate_limited = false;
882
883        for attempt in 1..=max_extract_attempts {
884            match extract_with_codex(
885                &codex_binary,
886                &file_content,
887                args.codex_model.as_deref(),
888                args.codex_timeout,
889                &schema_path,
890            ) {
891                Ok(result) => {
892                    extraction_result = Some(result);
893                    break;
894                }
895                Err(ref e) if matches!(e, AppError::RateLimited { .. }) => {
896                    last_extract_err = Some(format!("{e}"));
897                    last_was_rate_limited = true;
898                    break;
899                }
900                Err(e) => {
901                    let msg = format!("{e}");
902                    if attempt < max_extract_attempts {
903                        let cold_start_delay = 2 * attempt as u64;
904                        tracing::warn!(
905                            target: "ingest",
906                            attempt,
907                            delay_secs = cold_start_delay,
908                            error = %msg,
909                            "codex extraction failed, retrying"
910                        );
911                        std::thread::sleep(std::time::Duration::from_secs(cold_start_delay));
912                    }
913                    last_extract_err = Some(msg);
914                }
915            }
916        }
917
918        if let Some((extraction, usage)) = extraction_result {
919            backoff_secs = args.rate_limit_wait;
920
921            let in_tok = usage.as_ref().map(|u| u.input_tokens).unwrap_or(0);
922            let out_tok = usage.as_ref().map(|u| u.output_tokens).unwrap_or(0);
923
924            let name = &extraction.name;
925            let ent_count = extraction.entities.len();
926            let rel_count = 0;
927
928            // GAP-SG-47: fold non-canonical labels onto the nearest canonical
929            // kind instead of discarding the entity (no silent data loss).
930            let new_entities: Vec<NewEntity> = extraction
931                .entities
932                .iter()
933                .map(|e| NewEntity {
934                    name: e.name.clone(),
935                    entity_type: EntityType::map_to_canonical(&e.entity_type),
936                    description: None,
937                })
938                .collect();
939
940            // GAP-SG-48: rewrite non-canonical relations to canonical instead
941            // of normalizing-and-accepting them raw.
942            let new_relationships: Vec<NewRelationship> = extraction
943                .relationships
944                .iter()
945                .map(|r| NewRelationship {
946                    source: r.source.clone(),
947                    target: r.target.clone(),
948                    relation: crate::parsers::map_to_canonical_relation(&r.relation),
949                    strength: r.strength,
950                    description: None,
951                })
952                .collect();
953
954            let body_str = String::from_utf8(file_content.clone())
955                .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
956            let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
957            let new_memory = NewMemory {
958                name: name.clone(),
959                namespace: namespace.clone(),
960                memory_type: memory_type_str.clone(),
961                description: extraction.description.clone(),
962                body: body_str.to_string(),
963                body_hash,
964                session_id: None,
965                source: "agent".to_string(),
966                metadata: serde_json::Value::Object(serde_json::Map::new()),
967            };
968
969            // Deduplication: update existing memory instead of failing on UNIQUE
970            let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
971                Some((existing_id, is_deleted)) => {
972                    if is_deleted {
973                        memories::clear_deleted_at(&conn, existing_id)?;
974                    }
975                    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
976                        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
977                        rusqlite::params![existing_id],
978                        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
979                    )?;
980                    memories::update(&conn, existing_id, &new_memory, None)?;
981                    memories::sync_fts_after_update(
982                        &conn,
983                        existing_id,
984                        &old_name,
985                        &old_desc,
986                        &old_body,
987                        &new_memory.name,
988                        &new_memory.description,
989                        &new_memory.body,
990                    )?;
991                    tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
992                    existing_id
993                }
994                None => match memories::insert(&conn, &new_memory) {
995                    Ok(id) => id,
996                    Err(e) => {
997                        let err_msg = format!("{e}");
998                        let _ = queue_conn.execute(
999                            "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1000                            rusqlite::params![err_msg, queue_id],
1001                        );
1002                        let current_index = completed + failed + skipped;
1003                        failed += 1;
1004                        emit_json(&FileEvent {
1005                            file: &file_path,
1006                            name,
1007                            status: "failed",
1008                            memory_id: None,
1009                            entities: None,
1010                            rels: None,
1011                            cost_usd: None,
1012                            input_tokens: Some(in_tok),
1013                            output_tokens: Some(out_tok),
1014                            elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1015                            error: Some(&err_msg),
1016                            index: current_index,
1017                            total,
1018                        });
1019                        input_tokens_total += in_tok;
1020                        output_tokens_total += out_tok;
1021                        if args.fail_fast {
1022                            break;
1023                        }
1024                        continue;
1025                    }
1026                },
1027            };
1028
1029            for ent in &new_entities {
1030                if let Ok(eid) = entities::upsert_entity(&conn, &namespace, ent) {
1031                    let _ = entities::link_memory_entity(&conn, memory_id, eid);
1032                }
1033            }
1034            for rel in &new_relationships {
1035                crate::parsers::warn_if_non_canonical(&rel.relation);
1036                let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
1037                let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
1038                if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1039                    let _ = conn.execute(
1040                        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
1041                        rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
1042                    );
1043                }
1044            }
1045
1046            let _ = queue_conn.execute(
1047                "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, \
1048                 input_tokens=?5, output_tokens=?6, elapsed_ms=?7, done_at=datetime('now') WHERE id=?8",
1049                rusqlite::params![
1050                    name,
1051                    memory_id,
1052                    ent_count,
1053                    rel_count,
1054                    in_tok,
1055                    out_tok,
1056                    file_started.elapsed().as_millis() as i64,
1057                    queue_id
1058                ],
1059            );
1060
1061            let current_index = completed + failed + skipped;
1062            completed += 1;
1063            entities_total += ent_count;
1064            rels_total += rel_count;
1065            input_tokens_total += in_tok;
1066            output_tokens_total += out_tok;
1067
1068            emit_json(&FileEvent {
1069                file: &file_path,
1070                name,
1071                status: "done",
1072                memory_id: Some(memory_id),
1073                entities: Some(ent_count),
1074                rels: Some(rel_count),
1075                cost_usd: None,
1076                input_tokens: Some(in_tok),
1077                output_tokens: Some(out_tok),
1078                elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1079                error: None,
1080                index: current_index,
1081                total,
1082            });
1083        } else if let Some(ref err_str) = last_extract_err {
1084            if last_was_rate_limited {
1085                if crate::retry::is_kill_switch_active() {
1086                    tracing::warn!(target: "ingest", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1087                } else if std::time::Instant::now() >= rate_limit_deadline {
1088                    tracing::error!(target: "ingest", "rate-limit retry deadline (1h) exhausted");
1089                } else {
1090                    let half = backoff_secs / 2;
1091                    let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1092                    let actual_wait = half + jitter;
1093                    tracing::warn!(target: "ingest", delay_secs = actual_wait, error_kind = "rate_limited", "Codex rate limited, backing off");
1094                    let _ = queue_conn.execute(
1095                        "UPDATE queue SET status='pending' WHERE id=?1",
1096                        rusqlite::params![queue_id],
1097                    );
1098                    std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1099                    backoff_secs = (backoff_secs * 2).min(900);
1100                    continue;
1101                }
1102            } else {
1103                let _ = queue_conn.execute(
1104                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1105                    rusqlite::params![err_str, queue_id],
1106                );
1107                let current_index = completed + failed + skipped;
1108                failed += 1;
1109                emit_json(&FileEvent {
1110                    file: &file_path,
1111                    name: "",
1112                    status: "failed",
1113                    memory_id: None,
1114                    entities: None,
1115                    rels: None,
1116                    cost_usd: None,
1117                    input_tokens: None,
1118                    output_tokens: None,
1119                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1120                    error: Some(err_str),
1121                    index: current_index,
1122                    total,
1123                });
1124                if args.fail_fast {
1125                    break;
1126                }
1127            }
1128        }
1129    }
1130
1131    // WAL checkpoint before summary
1132    let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);");
1133
1134    // Stage 4: Summary
1135    emit_json(&Summary {
1136        summary: true,
1137        files_total: total,
1138        completed,
1139        failed,
1140        skipped,
1141        entities_total,
1142        rels_total,
1143        input_tokens_total,
1144        output_tokens_total,
1145        elapsed_ms: started.elapsed().as_millis() as u64,
1146    });
1147
1148    if !args.keep_queue && failed == 0 {
1149        let _ = std::fs::remove_file(&queue_path);
1150    }
1151
1152    Ok(())
1153}
1154
1155#[cfg(test)]
1156mod tests {
1157    use super::*;
1158
1159    fn make_agent_message_event(text: &str) -> String {
1160        format!(
1161            r#"{{"type":"item.completed","item":{{"id":"item_0","type":"agent_message","text":{}}}}}"#,
1162            serde_json::to_string(text).unwrap()
1163        )
1164    }
1165
1166    fn make_usage_event(input: u64, output: u64) -> String {
1167        format!(
1168            r#"{{"type":"turn.completed","usage":{{"input_tokens":{input},"output_tokens":{output}}}}}"#
1169        )
1170    }
1171
1172    fn valid_extraction_json() -> String {
1173        r#"{"name":"test-module","description":"A test module for unit testing purposes","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-module","relation":"applies-to","strength":0.8}]}"#.to_string()
1174    }
1175
1176    #[test]
1177    fn test_parse_codex_output_valid() {
1178        let jsonl = format!(
1179            "{}\n{}\n{}",
1180            r#"{"type":"thread.started","thread_id":"t1"}"#,
1181            make_agent_message_event(&valid_extraction_json()),
1182            make_usage_event(100, 50),
1183        );
1184
1185        let (result, usage) = parse_codex_output(&jsonl).expect("parse must succeed");
1186        assert_eq!(result.name, "test-module");
1187        assert_eq!(result.entities.len(), 1);
1188        assert_eq!(result.relationships.len(), 1);
1189        let u = usage.expect("usage must be present");
1190        assert_eq!(u.input_tokens, 100);
1191        assert_eq!(u.output_tokens, 50);
1192    }
1193
1194    #[test]
1195    fn test_parse_codex_output_turn_failed() {
1196        let jsonl = format!(
1197            "{}\n{}",
1198            r#"{"type":"thread.started","thread_id":"t1"}"#,
1199            r#"{"type":"turn.failed","error":{"message":"model error occurred"}}"#,
1200        );
1201
1202        let err = parse_codex_output(&jsonl).unwrap_err();
1203        let msg = format!("{err}");
1204        assert!(
1205            msg.contains("turn failed"),
1206            "expected 'turn failed' in: {msg}"
1207        );
1208        assert!(msg.contains("model error occurred"));
1209    }
1210
1211    #[test]
1212    fn test_parse_codex_output_rate_limit() {
1213        let jsonl = r#"{"type":"turn.failed","error":{"message":"rate_limit exceeded, 429 Too Many Requests"}}"#;
1214
1215        let err = parse_codex_output(jsonl).unwrap_err();
1216        assert!(
1217            matches!(err, AppError::RateLimited { .. }),
1218            "expected AppError::RateLimited, got: {err}"
1219        );
1220    }
1221
1222    #[test]
1223    fn test_parse_codex_output_schema_error() {
1224        let jsonl = r#"{"type":"error","message":"invalid_json_schema: additional properties not allowed"}"#;
1225
1226        let err = parse_codex_output(jsonl).unwrap_err();
1227        let msg = format!("{err}");
1228        assert!(
1229            msg.contains("invalid_json_schema") || msg.contains("schema"),
1230            "expected schema error in: {msg}"
1231        );
1232    }
1233
1234    #[test]
1235    fn test_extraction_schema_codex_valid_json() {
1236        let _: serde_json::Value =
1237            serde_json::from_str(EXTRACTION_SCHEMA_CODEX).expect("schema must be valid JSON");
1238    }
1239
1240    #[test]
1241    fn test_extraction_schema_codex_has_additional_properties_false() {
1242        let schema: serde_json::Value =
1243            serde_json::from_str(EXTRACTION_SCHEMA_CODEX).expect("schema must be valid JSON");
1244
1245        // Root level
1246        assert_eq!(
1247            schema["additionalProperties"].as_bool(),
1248            Some(false),
1249            "root must have additionalProperties: false"
1250        );
1251
1252        // Entity items level
1253        assert_eq!(
1254            schema["properties"]["entities"]["items"]["additionalProperties"].as_bool(),
1255            Some(false),
1256            "entity items must have additionalProperties: false"
1257        );
1258
1259        // Relationship items level
1260        assert_eq!(
1261            schema["properties"]["relationships"]["items"]["additionalProperties"].as_bool(),
1262            Some(false),
1263            "relationship items must have additionalProperties: false"
1264        );
1265    }
1266
1267    #[test]
1268    fn test_parse_codex_output_last_agent_message_wins() {
1269        // Multiple agent_message items — last one should win
1270        let first_text = r#"{"name":"first-result","description":"First result should be ignored","entities":[],"relationships":[]}"#;
1271        let second_text = r#"{"name":"final-result","description":"Final result wins over earlier ones","entities":[{"name":"final-entity","entity_type":"concept"}],"relationships":[]}"#;
1272
1273        let jsonl = format!(
1274            "{}\n{}\n{}\n{}",
1275            r#"{"type":"thread.started","thread_id":"t1"}"#,
1276            make_agent_message_event(first_text),
1277            make_agent_message_event(second_text),
1278            make_usage_event(200, 80),
1279        );
1280
1281        let (result, _) = parse_codex_output(&jsonl).expect("parse must succeed");
1282        assert_eq!(result.name, "final-result", "last agent_message should win");
1283        assert_eq!(result.entities.len(), 1);
1284    }
1285
1286    #[test]
1287    fn test_parse_codex_output_skips_malformed_lines() {
1288        let jsonl = format!(
1289            "not json at all\n{}\n{{broken\n{}",
1290            make_agent_message_event(&valid_extraction_json()),
1291            make_usage_event(10, 5),
1292        );
1293
1294        // Should succeed despite malformed lines
1295        let (result, _) = parse_codex_output(&jsonl).expect("malformed lines must be skipped");
1296        assert_eq!(result.name, "test-module");
1297    }
1298}