Skip to main content

sqlite_graphrag/commands/
ingest_codex.rs

1//! Handler for `ingest --mode codex`.
2//!
3//! Orchestrates the locally installed OpenAI Codex CLI binary (`codex exec`)
4//! to extract domain-specific entities and relationships from each file,
5//! then persists them with full embedding pipeline for recall/hybrid-search.
6//!
7//! Architecture: P1 One-Shot per file — each file spawns a separate
8//! `codex exec` process with `--output-schema` for guaranteed structured output.
9//! A SQLite queue DB tracks progress for resume/retry support.
10
11use crate::commands::ingest::IngestArgs;
12use crate::commands::ingest_claude::ExtractionResult;
13use crate::entity_type::EntityType;
14use crate::errors::AppError;
15use crate::paths::AppPaths;
16use crate::storage::connection::{ensure_db_ready, open_rw};
17use crate::storage::entities::{self, NewEntity, NewRelationship};
18use crate::storage::memories::{self, NewMemory};
19
20use rusqlite::Connection;
21use serde::{Deserialize, Serialize};
22use std::io::Write;
23use std::path::{Path, PathBuf};
24use std::process::{Command, Stdio};
25use std::time::Instant;
26
27const MIN_CODEX_VERSION: &str = "0.120.0";
28
29/// OpenAI structured output schema with `additionalProperties: false` at all nested levels.
30const EXTRACTION_SCHEMA_CODEX: &str = r#"{
31  "type": "object",
32  "properties": {
33    "name": { "type": "string" },
34    "description": { "type": "string" },
35    "entities": {
36      "type": "array",
37      "items": {
38        "type": "object",
39        "properties": {
40          "name": { "type": "string" },
41          "entity_type": {
42            "type": "string",
43            "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
44          }
45        },
46        "required": ["name", "entity_type"],
47        "additionalProperties": false
48      }
49    },
50    "relationships": {
51      "type": "array",
52      "items": {
53        "type": "object",
54        "properties": {
55          "source": { "type": "string" },
56          "target": { "type": "string" },
57          "relation": {
58            "type": "string",
59            "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
60          },
61          "strength": { "type": "number", "minimum": 0, "maximum": 1 }
62        },
63        "required": ["source","target","relation","strength"],
64        "additionalProperties": false
65      }
66    }
67  },
68  "required": ["name","description","entities","relationships"],
69  "additionalProperties": false
70}"#;
71
72const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
731. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
742. A one-sentence description (10-20 words) summarizing the key insight\n\
753. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
764. Typed relationships between entities with strength scores\n\n\
77Rules:\n\
78- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
79- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
80- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
81- NEVER use 'mentions' as relationship type\n\
82- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
83- Prefer fewer high-quality entities over many low-quality ones\n\
84- Description must answer: What is this about and WHY does it matter?";
85
86/// Token usage reported by Codex CLI on `turn.completed` events.
87#[derive(Debug, Clone, Deserialize, Serialize)]
88struct CodexUsage {
89    input_tokens: u64,
90    #[serde(default)]
91    cached_input_tokens: u64,
92    output_tokens: u64,
93    #[serde(default)]
94    reasoning_output_tokens: u64,
95}
96
97#[derive(Debug, Serialize)]
98struct PhaseEvent<'a> {
99    phase: &'a str,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    codex_path: Option<&'a str>,
102    #[serde(skip_serializing_if = "Option::is_none")]
103    version: Option<&'a str>,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    dir: Option<&'a str>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    files_total: Option<usize>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    files_new: Option<usize>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    files_existing: Option<usize>,
112}
113
114#[derive(Debug, Serialize)]
115struct FileEvent<'a> {
116    file: &'a str,
117    name: &'a str,
118    status: &'a str,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    memory_id: Option<i64>,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    entities: Option<usize>,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    rels: Option<usize>,
125    /// Always None for Codex (no cost_usd in Codex API responses).
126    #[serde(skip_serializing_if = "Option::is_none")]
127    cost_usd: Option<f64>,
128    #[serde(skip_serializing_if = "Option::is_none")]
129    input_tokens: Option<u64>,
130    #[serde(skip_serializing_if = "Option::is_none")]
131    output_tokens: Option<u64>,
132    #[serde(skip_serializing_if = "Option::is_none")]
133    elapsed_ms: Option<u64>,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    error: Option<&'a str>,
136    index: usize,
137    total: usize,
138}
139
140#[derive(Debug, Serialize)]
141struct Summary {
142    summary: bool,
143    files_total: usize,
144    completed: usize,
145    failed: usize,
146    skipped: usize,
147    entities_total: usize,
148    rels_total: usize,
149    input_tokens_total: u64,
150    output_tokens_total: u64,
151    elapsed_ms: u64,
152}
153
154/// Locates the Codex CLI binary on the system.
155///
156/// Search order:
157/// 1. Explicit `--codex-binary` CLI flag.
158/// 2. `SQLITE_GRAPHRAG_CODEX_BINARY` env var.
159/// 3. PATH search for `codex` (or `codex.exe` on Windows).
160pub fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
161    if let Some(p) = explicit {
162        if p.exists() {
163            return Ok(p.to_path_buf());
164        }
165        return Err(AppError::Validation(format!(
166            "Codex CLI binary not found at explicit path: {}",
167            p.display()
168        )));
169    }
170
171    if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
172        let p = PathBuf::from(&env_path);
173        if p.exists() {
174            return Ok(p);
175        }
176    }
177
178    let name = if cfg!(windows) { "codex.exe" } else { "codex" };
179    if let Some(path_var) = std::env::var_os("PATH") {
180        for dir in std::env::split_paths(&path_var) {
181            let candidate = dir.join(name);
182            if candidate.exists() {
183                return Ok(candidate);
184            }
185        }
186    }
187
188    Err(AppError::Validation(
189        "Codex CLI binary not found in PATH. Install it from https://github.com/openai/codex or specify --codex-binary".to_string(),
190    ))
191}
192
193/// Validates that the Codex CLI binary meets the minimum version requirement.
194///
195/// # Errors
196///
197/// Returns `AppError::Validation` when the binary cannot be executed or the
198/// version is below `MIN_CODEX_VERSION`.
199fn validate_codex_version(binary: &Path) -> Result<String, AppError> {
200    let output = Command::new(binary)
201        .arg("--version")
202        .stdin(Stdio::null())
203        .stdout(Stdio::piped())
204        .stderr(Stdio::piped())
205        .output()
206        .map_err(AppError::Io)?;
207
208    let raw = String::from_utf8(output.stdout)
209        .map_err(|_| AppError::Validation("codex --version output is not UTF-8".to_string()))?;
210
211    let version_str = raw.trim().to_string();
212
213    // Codex CLI outputs: "codex-cli 0.133.0" or just "0.133.0"
214    let numeric = version_str.split_whitespace().last().unwrap_or("").trim();
215
216    fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
217        let parts: Vec<&str> = s.splitn(3, '.').collect();
218        if parts.len() < 2 {
219            return None;
220        }
221        let major = parts[0].parse::<u64>().ok()?;
222        let minor = parts[1].parse::<u64>().ok()?;
223        let patch = parts
224            .get(2)
225            .and_then(|p| p.parse::<u64>().ok())
226            .unwrap_or(0);
227        Some((major, minor, patch))
228    }
229
230    if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CODEX_VERSION)) {
231        if actual < min {
232            return Err(AppError::Validation(format!(
233                "Codex CLI version {numeric} is below minimum required {MIN_CODEX_VERSION}"
234            )));
235        }
236    }
237
238    Ok(version_str)
239}
240
241/// Writes the extraction schema to a named temp file for `--output-schema`.
242///
243/// # Errors
244///
245/// Returns `AppError::Io` when the temp file cannot be created or written.
246fn write_schema_tempfile() -> Result<tempfile::NamedTempFile, AppError> {
247    let mut f = tempfile::NamedTempFile::new().map_err(AppError::Io)?;
248    std::io::Write::write_all(&mut f, EXTRACTION_SCHEMA_CODEX.as_bytes()).map_err(AppError::Io)?;
249    std::io::Write::flush(&mut f).map_err(AppError::Io)?;
250    Ok(f)
251}
252
253/// Invokes `codex exec` for a single file and returns the extraction result.
254///
255/// Uses `wait-timeout` for cross-platform subprocess timeout, `env_clear()`
256/// for least-privilege environment, and reads prompt + file content from
257/// stdin using the `-` argument (Codex Paperclip pattern).
258///
259/// # Errors
260///
261/// Returns `AppError::Validation` on extraction failure, rate limiting, or
262/// schema errors. Returns `AppError::Io` on process spawn/IO failures.
263fn extract_with_codex(
264    binary: &Path,
265    file_content: &[u8],
266    model: Option<&str>,
267    timeout_secs: u64,
268    schema_file: &Path,
269) -> Result<(ExtractionResult, Option<CodexUsage>), AppError> {
270    use wait_timeout::ChildExt;
271
272    let mut cmd = Command::new(binary);
273
274    cmd.env_clear();
275    for var in &[
276        "PATH",
277        "HOME",
278        "USER",
279        "SHELL",
280        "TERM",
281        "LANG",
282        "XDG_CONFIG_HOME",
283        "XDG_DATA_HOME",
284        "XDG_RUNTIME_DIR",
285        "XDG_CACHE_HOME",
286        "OPENAI_API_KEY",
287        "CODEX_ACCESS_TOKEN",
288        "CODEX_HOME",
289        "TMPDIR",
290        "TMP",
291        "TEMP",
292        "DYLD_FALLBACK_LIBRARY_PATH",
293    ] {
294        if let Ok(val) = std::env::var(var) {
295            cmd.env(var, val);
296        }
297    }
298
299    #[cfg(windows)]
300    for var in &[
301        "LOCALAPPDATA",
302        "APPDATA",
303        "USERPROFILE",
304        "SystemRoot",
305        "COMSPEC",
306        "PATHEXT",
307    ] {
308        if let Ok(val) = std::env::var(var) {
309            cmd.env(var, val);
310        }
311    }
312
313    cmd.arg("exec")
314        .arg("--json")
315        .arg("--output-schema")
316        .arg(schema_file)
317        .arg("--ephemeral")
318        .arg("--skip-git-repo-check")
319        .arg("--sandbox")
320        .arg("read-only")
321        .arg("--ignore-user-config")
322        .arg("--ignore-rules");
323
324    if let Some(m) = model {
325        cmd.arg("-m").arg(m);
326    }
327
328    // `-` means: read the prompt from stdin (Paperclip pattern)
329    cmd.arg("-");
330
331    cmd.stdin(Stdio::piped())
332        .stdout(Stdio::piped())
333        .stderr(Stdio::piped());
334
335    let mut child = cmd.spawn().map_err(|e| {
336        AppError::Io(std::io::Error::new(
337            e.kind(),
338            format!("failed to spawn codex: {e}"),
339        ))
340    })?;
341
342    // Build stdin: prompt + document content
343    let file_utf8 = String::from_utf8_lossy(file_content);
344    let stdin_payload = format!("{EXTRACTION_PROMPT}\n\n---\n\nDocument content:\n\n{file_utf8}");
345    let stdin_bytes = stdin_payload.into_bytes();
346
347    let mut child_stdin = child
348        .stdin
349        .take()
350        .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
351    let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
352        child_stdin.write_all(&stdin_bytes)?;
353        Ok(())
354    });
355
356    let timeout = std::time::Duration::from_secs(timeout_secs);
357    let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
358
359    match status {
360        Some(exit_status) => {
361            stdin_thread
362                .join()
363                .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
364                .map_err(AppError::Io)?;
365
366            let mut stdout_buf = Vec::new();
367            let mut stderr_buf = Vec::new();
368            if let Some(mut out) = child.stdout.take() {
369                std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
370            }
371            if let Some(mut err) = child.stderr.take() {
372                std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
373            }
374
375            if !exit_status.success() {
376                let stderr_str = String::from_utf8_lossy(&stderr_buf);
377                let stdout_str = String::from_utf8_lossy(&stdout_buf);
378                // Check if stdout has JSONL with an error event before falling back
379                if let Ok((result, usage)) = parse_codex_output(&stdout_str) {
380                    return Ok((result, usage));
381                }
382                if stderr_str.contains("401")
383                    || stderr_str.contains("Unauthorized")
384                    || stderr_str.contains("auth")
385                {
386                    tracing::warn!(
387                        target: "ingest",
388                        "Codex CLI authentication expired. Re-authenticate with: codex auth login"
389                    );
390                }
391                return Err(AppError::Validation(format!(
392                    "codex exec exited with code {:?}: {}",
393                    exit_status.code(),
394                    stderr_str.trim()
395                )));
396            }
397
398            let stdout = String::from_utf8(stdout_buf)
399                .map_err(|_| AppError::Validation("codex exec stdout is not valid UTF-8".into()))?;
400            parse_codex_output(&stdout)
401        }
402        None => {
403            tracing::warn!(target: "ingest", timeout_secs, "codex exec timed out, killing process");
404            let _ = child.kill();
405            let _ = child.wait();
406            let _ = stdin_thread.join();
407            Err(AppError::Validation(format!(
408                "codex exec timed out after {timeout_secs} seconds"
409            )))
410        }
411    }
412}
413
414/// Parses JSONL output from `codex exec --json`.
415///
416/// Event format (DOTS notation):
417/// - `thread.started` — session init
418/// - `turn.started` — model turn begins
419/// - `item.completed` — message or tool call; last `agent_message` wins
420/// - `turn.completed` — includes usage stats
421/// - `turn.failed` — error with optional rate-limit indicator
422/// - `error` — schema or validation error
423///
424/// # Errors
425///
426/// Returns `AppError::Validation` when no agent_message is found, when the
427/// turn failed, or when the extracted JSON cannot be parsed as `ExtractionResult`.
428fn parse_codex_output(stdout: &str) -> Result<(ExtractionResult, Option<CodexUsage>), AppError> {
429    let mut last_agent_text: Option<String> = None;
430    let mut usage: Option<CodexUsage> = None;
431    let mut rate_limited = false;
432    let mut schema_error = false;
433    let mut turn_failed = false;
434    let mut failed_message = String::new();
435
436    for line in stdout.lines() {
437        let line = line.trim();
438        if line.is_empty() {
439            continue;
440        }
441
442        let event: serde_json::Value = match serde_json::from_str(line) {
443            Ok(v) => v,
444            Err(_) => {
445                tracing::warn!(target: "ingest", line, "codex output: skipping malformed JSONL line");
446                continue;
447            }
448        };
449
450        let event_type = match event.get("type").and_then(|t| t.as_str()) {
451            Some(t) => t,
452            None => continue,
453        };
454
455        match event_type {
456            "item.completed" => {
457                // Last agent_message wins (reasoning / tool calls may appear before)
458                if let Some(item) = event.get("item") {
459                    if item.get("type").and_then(|t| t.as_str()) == Some("agent_message") {
460                        if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
461                            last_agent_text = Some(text.to_string());
462                        }
463                    }
464                }
465            }
466            "turn.completed" => {
467                if let Some(u) = event.get("usage") {
468                    if let Ok(parsed) = serde_json::from_value::<CodexUsage>(u.clone()) {
469                        usage = Some(parsed);
470                    }
471                }
472            }
473            "turn.failed" => {
474                turn_failed = true;
475                if let Some(err) = event.get("error") {
476                    let msg = err
477                        .get("message")
478                        .and_then(|m| m.as_str())
479                        .unwrap_or("unknown error");
480                    failed_message = msg.to_string();
481                    if msg.contains("rate_limit")
482                        || msg.contains("429")
483                        || msg.contains("Too Many Requests")
484                    {
485                        rate_limited = true;
486                    }
487                }
488            }
489            "error" => {
490                if let Some(msg) = event.get("message").and_then(|m| m.as_str()) {
491                    if msg.contains("invalid_json_schema") || msg.contains("schema") {
492                        schema_error = true;
493                    }
494                    tracing::warn!(target: "ingest", error_msg = msg, "codex error event received");
495                }
496            }
497            _ => {
498                // Gracefully skip unknown event types (thread.started, turn.started, etc.)
499            }
500        }
501    }
502
503    if rate_limited {
504        return Err(AppError::Validation(format!(
505            "RATE_LIMITED: {failed_message}"
506        )));
507    }
508
509    if schema_error {
510        return Err(AppError::Validation(
511            "codex rejected the output schema (invalid_json_schema)".to_string(),
512        ));
513    }
514
515    if turn_failed {
516        return Err(AppError::Validation(format!(
517            "codex turn failed: {failed_message}"
518        )));
519    }
520
521    let text = last_agent_text.ok_or_else(|| {
522        AppError::Validation("codex output contained no agent_message item".to_string())
523    })?;
524
525    let extraction: ExtractionResult = serde_json::from_str(&text).map_err(|e| {
526        AppError::Validation(format!(
527            "failed to parse codex agent_message as ExtractionResult: {e}. text={text}"
528        ))
529    })?;
530
531    Ok((extraction, usage))
532}
533
534fn emit_json<T: Serialize>(value: &T) {
535    if let Ok(json) = serde_json::to_string(value) {
536        let stdout = std::io::stdout();
537        let mut lock = stdout.lock();
538        let _ = writeln!(lock, "{json}");
539        let _ = lock.flush();
540    }
541}
542
543/// Collects files matching the pattern (reuses ingest logic).
544fn collect_matching_files(
545    dir: &Path,
546    pattern: &str,
547    recursive: bool,
548    max_files: usize,
549) -> Result<Vec<PathBuf>, AppError> {
550    let mut files = Vec::new();
551    super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
552    files.sort();
553
554    if files.len() > max_files {
555        return Err(AppError::Validation(format!(
556            "found {} files, exceeds --max-files cap of {}",
557            files.len(),
558            max_files
559        )));
560    }
561
562    Ok(files)
563}
564
565/// Opens or creates the queue database for tracking ingest progress.
566fn open_queue_db(path: &str) -> Result<Connection, AppError> {
567    let conn = Connection::open(path)?;
568
569    conn.execute_batch(
570        "PRAGMA journal_mode=WAL;
571        CREATE TABLE IF NOT EXISTS queue (
572            id          INTEGER PRIMARY KEY AUTOINCREMENT,
573            file_path   TEXT NOT NULL UNIQUE,
574            name        TEXT,
575            status      TEXT NOT NULL DEFAULT 'pending',
576            memory_id   INTEGER,
577            entities    INTEGER DEFAULT 0,
578            rels        INTEGER DEFAULT 0,
579            error       TEXT,
580            input_tokens  INTEGER DEFAULT 0,
581            output_tokens INTEGER DEFAULT 0,
582            attempt     INTEGER DEFAULT 0,
583            elapsed_ms  INTEGER,
584            created_at  TEXT DEFAULT (datetime('now')),
585            done_at     TEXT
586        );
587        CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
588    )?;
589
590    Ok(conn)
591}
592
593/// Main entry point for `ingest --mode codex`.
594///
595/// # Errors
596///
597/// Returns `AppError` on directory/DB access failures or fatal extraction errors.
598pub fn run_codex_ingest(args: &IngestArgs) -> Result<(), AppError> {
599    let started = Instant::now();
600
601    if !args.dir.exists() {
602        return Err(AppError::Validation(format!(
603            "directory not found: {}",
604            args.dir.display()
605        )));
606    }
607
608    // Stage 1: Validate binary
609    let codex_binary = find_codex_binary(args.codex_binary.as_deref())?;
610    let version = validate_codex_version(&codex_binary)?;
611    tracing::info!(
612        target: "ingest",
613        binary = %codex_binary.display(),
614        version = %version,
615        "Codex CLI binary validated"
616    );
617
618    emit_json(&PhaseEvent {
619        phase: "validate",
620        codex_path: codex_binary.to_str(),
621        version: Some(&version),
622        dir: None,
623        files_total: None,
624        files_new: None,
625        files_existing: None,
626    });
627
628    // Stage 2: Scan files
629    let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
630
631    let queue_conn = open_queue_db(&args.queue_db)?;
632
633    if args.resume {
634        let reset = queue_conn
635            .execute(
636                "UPDATE queue SET status='pending' WHERE status='processing'",
637                [],
638            )
639            .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
640        if reset > 0 {
641            tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
642        }
643    }
644
645    if args.retry_failed {
646        let count = queue_conn
647            .execute(
648                "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
649                [],
650            )
651            .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
652        tracing::info!(target: "ingest", count, "retrying failed files");
653    }
654
655    if !args.resume && !args.retry_failed {
656        queue_conn
657            .execute("DELETE FROM queue", [])
658            .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
659    }
660
661    let mut new_count = 0usize;
662    let mut existing_count = 0usize;
663
664    if !args.retry_failed {
665        for file in &files {
666            let file_str = file.to_string_lossy().to_string();
667            let inserted = queue_conn
668                .execute(
669                    "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
670                    rusqlite::params![file_str],
671                )
672                .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
673            if inserted > 0 {
674                new_count += 1;
675            } else {
676                existing_count += 1;
677            }
678        }
679    }
680
681    emit_json(&PhaseEvent {
682        phase: "scan",
683        codex_path: None,
684        version: None,
685        dir: args.dir.to_str(),
686        files_total: Some(files.len()),
687        files_new: Some(new_count),
688        files_existing: Some(existing_count),
689    });
690
691    if args.dry_run {
692        for (idx, file) in files.iter().enumerate() {
693            let (name, _truncated, _orig) =
694                super::ingest::derive_kebab_name(file, args.max_name_length);
695            emit_json(&FileEvent {
696                file: &file.to_string_lossy(),
697                name: &name,
698                status: "preview",
699                memory_id: None,
700                entities: None,
701                rels: None,
702                cost_usd: None,
703                input_tokens: None,
704                output_tokens: None,
705                elapsed_ms: None,
706                error: None,
707                index: idx,
708                total: files.len(),
709            });
710        }
711        emit_json(&Summary {
712            summary: true,
713            files_total: files.len(),
714            completed: 0,
715            failed: 0,
716            skipped: 0,
717            entities_total: 0,
718            rels_total: 0,
719            input_tokens_total: 0,
720            output_tokens_total: 0,
721            elapsed_ms: started.elapsed().as_millis() as u64,
722        });
723        if !args.keep_queue {
724            let _ = std::fs::remove_file(&args.queue_db);
725        }
726        return Ok(());
727    }
728
729    // Stage 3: Process files
730    let paths = AppPaths::resolve(args.db.as_deref())?;
731    ensure_db_ready(&paths)?;
732    let conn = open_rw(&paths.db)?;
733    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
734    let memory_type_str = args.r#type.as_str().to_string();
735
736    // Write schema to temp file once (reused across all files)
737    let schema_tempfile = write_schema_tempfile()?;
738    let schema_path = schema_tempfile.path().to_path_buf();
739
740    let mut completed = 0usize;
741    let mut failed = 0usize;
742    let skipped_initial: usize = queue_conn
743        .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
744            r.get::<_, usize>(0)
745        })
746        .unwrap_or(0);
747    let skipped = skipped_initial;
748    let mut entities_total = 0usize;
749    let mut rels_total = 0usize;
750    let mut input_tokens_total = 0u64;
751    let mut output_tokens_total = 0u64;
752    let total = files.len();
753
754    let mut backoff_secs = args.rate_limit_wait;
755
756    loop {
757        let pending: Option<(i64, String)> = queue_conn
758            .query_row(
759                "UPDATE queue SET status='processing', attempt=attempt+1 \
760                 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
761                 RETURNING id, file_path",
762                [],
763                |row| Ok((row.get(0)?, row.get(1)?)),
764            )
765            .ok();
766
767        let (queue_id, file_path) = match pending {
768            Some(p) => p,
769            None => break,
770        };
771
772        let file_started = Instant::now();
773
774        // Reject files that exceed the 10 MB stdin limit
775        const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
776        if let Ok(meta) = std::fs::metadata(&file_path) {
777            if meta.len() > MAX_FILE_SIZE {
778                let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
779                let _ = queue_conn.execute(
780                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
781                    rusqlite::params![err_msg, queue_id],
782                );
783                let current_index = completed + failed + skipped;
784                failed += 1;
785                emit_json(&FileEvent {
786                    file: &file_path,
787                    name: "",
788                    status: "failed",
789                    memory_id: None,
790                    entities: None,
791                    rels: None,
792                    cost_usd: None,
793                    input_tokens: None,
794                    output_tokens: None,
795                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
796                    error: Some(&err_msg),
797                    index: current_index,
798                    total,
799                });
800                if args.fail_fast {
801                    break;
802                }
803                continue;
804            }
805        }
806
807        let file_content = match std::fs::read(&file_path) {
808            Ok(c) => c,
809            Err(e) => {
810                let err_msg = format!("IO error: {e}");
811                let _ = queue_conn.execute(
812                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
813                    rusqlite::params![err_msg, queue_id],
814                );
815                let current_index = completed + failed + skipped;
816                failed += 1;
817                emit_json(&FileEvent {
818                    file: &file_path,
819                    name: "",
820                    status: "failed",
821                    memory_id: None,
822                    entities: None,
823                    rels: None,
824                    cost_usd: None,
825                    input_tokens: None,
826                    output_tokens: None,
827                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
828                    error: Some(&err_msg),
829                    index: current_index,
830                    total,
831                });
832                if args.fail_fast {
833                    break;
834                }
835                continue;
836            }
837        };
838
839        // Retry once on cold-start failure
840        let max_extract_attempts: u32 = 2;
841        let mut extraction_result: Option<(ExtractionResult, Option<CodexUsage>)> = None;
842        let mut last_extract_err: Option<String> = None;
843
844        for attempt in 1..=max_extract_attempts {
845            match extract_with_codex(
846                &codex_binary,
847                &file_content,
848                args.codex_model.as_deref(),
849                args.codex_timeout,
850                &schema_path,
851            ) {
852                Ok(result) => {
853                    extraction_result = Some(result);
854                    break;
855                }
856                Err(ref e) if format!("{e}").contains("RATE_LIMITED") => {
857                    last_extract_err = Some(format!("{e}"));
858                    break;
859                }
860                Err(e) => {
861                    let msg = format!("{e}");
862                    if attempt < max_extract_attempts {
863                        tracing::warn!(
864                            target: "ingest",
865                            attempt,
866                            error = %msg,
867                            "codex extraction failed, retrying"
868                        );
869                        std::thread::sleep(std::time::Duration::from_secs(2));
870                    }
871                    last_extract_err = Some(msg);
872                }
873            }
874        }
875
876        if let Some((extraction, usage)) = extraction_result {
877            backoff_secs = args.rate_limit_wait;
878
879            let in_tok = usage.as_ref().map(|u| u.input_tokens).unwrap_or(0);
880            let out_tok = usage.as_ref().map(|u| u.output_tokens).unwrap_or(0);
881
882            let name = &extraction.name;
883            let ent_count = extraction.entities.len();
884            let rel_count = extraction.relationships.len();
885
886            let new_entities: Vec<NewEntity> = extraction
887                .entities
888                .iter()
889                .filter_map(|e| match e.entity_type.parse::<EntityType>() {
890                    Ok(et) => Some(NewEntity {
891                        name: e.name.clone(),
892                        entity_type: et,
893                        description: None,
894                    }),
895                    Err(_) => {
896                        tracing::warn!(
897                            target: "ingest",
898                            entity = %e.name,
899                            entity_type = %e.entity_type,
900                            "entity type not recognized, skipping"
901                        );
902                        None
903                    }
904                })
905                .collect();
906
907            let new_relationships: Vec<NewRelationship> = extraction
908                .relationships
909                .iter()
910                .map(|r| NewRelationship {
911                    source: r.source.clone(),
912                    target: r.target.clone(),
913                    relation: crate::parsers::normalize_relation(&r.relation),
914                    strength: r.strength,
915                    description: None,
916                })
917                .collect();
918
919            let body_str = String::from_utf8_lossy(&file_content);
920            let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
921            let new_memory = NewMemory {
922                name: name.clone(),
923                namespace: namespace.clone(),
924                memory_type: memory_type_str.clone(),
925                description: extraction.description.clone(),
926                body: body_str.to_string(),
927                body_hash,
928                session_id: None,
929                source: "agent".to_string(),
930                metadata: serde_json::Value::Object(serde_json::Map::new()),
931            };
932
933            // Deduplication: update existing memory instead of failing on UNIQUE
934            let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
935                Some((existing_id, is_deleted)) => {
936                    if is_deleted {
937                        memories::clear_deleted_at(&conn, existing_id)?;
938                    }
939                    let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
940                        "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
941                        rusqlite::params![existing_id],
942                        |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
943                    )?;
944                    memories::update(&conn, existing_id, &new_memory, None)?;
945                    memories::sync_fts_after_update(
946                        &conn,
947                        existing_id,
948                        &old_name,
949                        &old_desc,
950                        &old_body,
951                        &new_memory.name,
952                        &new_memory.description,
953                        &new_memory.body,
954                    )?;
955                    tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
956                    existing_id
957                }
958                None => match memories::insert(&conn, &new_memory) {
959                    Ok(id) => id,
960                    Err(e) => {
961                        let err_msg = format!("{e}");
962                        let _ = queue_conn.execute(
963                            "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
964                            rusqlite::params![err_msg, queue_id],
965                        );
966                        let current_index = completed + failed + skipped;
967                        failed += 1;
968                        emit_json(&FileEvent {
969                            file: &file_path,
970                            name,
971                            status: "failed",
972                            memory_id: None,
973                            entities: None,
974                            rels: None,
975                            cost_usd: None,
976                            input_tokens: Some(in_tok),
977                            output_tokens: Some(out_tok),
978                            elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
979                            error: Some(&err_msg),
980                            index: current_index,
981                            total,
982                        });
983                        input_tokens_total += in_tok;
984                        output_tokens_total += out_tok;
985                        if args.fail_fast {
986                            break;
987                        }
988                        continue;
989                    }
990                },
991            };
992
993            for ent in &new_entities {
994                if let Ok(eid) = entities::upsert_entity(&conn, &namespace, ent) {
995                    let _ = entities::link_memory_entity(&conn, memory_id, eid);
996                }
997            }
998            for rel in &new_relationships {
999                crate::parsers::warn_if_non_canonical(&rel.relation);
1000                let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
1001                let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
1002                if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1003                    let _ = conn.execute(
1004                        "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
1005                        rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
1006                    );
1007                }
1008            }
1009
1010            let _ = queue_conn.execute(
1011                "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, \
1012                 input_tokens=?5, output_tokens=?6, elapsed_ms=?7, done_at=datetime('now') WHERE id=?8",
1013                rusqlite::params![
1014                    name,
1015                    memory_id,
1016                    ent_count,
1017                    rel_count,
1018                    in_tok,
1019                    out_tok,
1020                    file_started.elapsed().as_millis() as i64,
1021                    queue_id
1022                ],
1023            );
1024
1025            let current_index = completed + failed + skipped;
1026            completed += 1;
1027            entities_total += ent_count;
1028            rels_total += rel_count;
1029            input_tokens_total += in_tok;
1030            output_tokens_total += out_tok;
1031
1032            emit_json(&FileEvent {
1033                file: &file_path,
1034                name,
1035                status: "done",
1036                memory_id: Some(memory_id),
1037                entities: Some(ent_count),
1038                rels: Some(rel_count),
1039                cost_usd: None,
1040                input_tokens: Some(in_tok),
1041                output_tokens: Some(out_tok),
1042                elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1043                error: None,
1044                index: current_index,
1045                total,
1046            });
1047        } else if let Some(ref err_str) = last_extract_err {
1048            if err_str.contains("RATE_LIMITED") {
1049                tracing::warn!(
1050                    target: "ingest",
1051                    wait_seconds = backoff_secs,
1052                    "rate limited by Codex API, waiting before retry"
1053                );
1054                let _ = queue_conn.execute(
1055                    "UPDATE queue SET status='pending' WHERE id=?1",
1056                    rusqlite::params![queue_id],
1057                );
1058                std::thread::sleep(std::time::Duration::from_secs(backoff_secs));
1059                backoff_secs = (backoff_secs * 2).min(900);
1060                continue;
1061            } else {
1062                let _ = queue_conn.execute(
1063                    "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1064                    rusqlite::params![err_str, queue_id],
1065                );
1066                let current_index = completed + failed + skipped;
1067                failed += 1;
1068                emit_json(&FileEvent {
1069                    file: &file_path,
1070                    name: "",
1071                    status: "failed",
1072                    memory_id: None,
1073                    entities: None,
1074                    rels: None,
1075                    cost_usd: None,
1076                    input_tokens: None,
1077                    output_tokens: None,
1078                    elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1079                    error: Some(err_str),
1080                    index: current_index,
1081                    total,
1082                });
1083                if args.fail_fast {
1084                    break;
1085                }
1086            }
1087        }
1088    }
1089
1090    // WAL checkpoint before summary
1091    let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);");
1092
1093    // Stage 4: Summary
1094    emit_json(&Summary {
1095        summary: true,
1096        files_total: total,
1097        completed,
1098        failed,
1099        skipped,
1100        entities_total,
1101        rels_total,
1102        input_tokens_total,
1103        output_tokens_total,
1104        elapsed_ms: started.elapsed().as_millis() as u64,
1105    });
1106
1107    if !args.keep_queue && failed == 0 {
1108        let _ = std::fs::remove_file(&args.queue_db);
1109    }
1110
1111    Ok(())
1112}
1113
1114#[cfg(test)]
1115mod tests {
1116    use super::*;
1117
1118    fn make_agent_message_event(text: &str) -> String {
1119        format!(
1120            r#"{{"type":"item.completed","item":{{"id":"item_0","type":"agent_message","text":{}}}}}"#,
1121            serde_json::to_string(text).unwrap()
1122        )
1123    }
1124
1125    fn make_usage_event(input: u64, output: u64) -> String {
1126        format!(
1127            r#"{{"type":"turn.completed","usage":{{"input_tokens":{input},"output_tokens":{output}}}}}"#
1128        )
1129    }
1130
1131    fn valid_extraction_json() -> String {
1132        r#"{"name":"test-module","description":"A test module for unit testing purposes","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-module","relation":"applies-to","strength":0.8}]}"#.to_string()
1133    }
1134
1135    #[test]
1136    fn test_parse_codex_output_valid() {
1137        let jsonl = format!(
1138            "{}\n{}\n{}",
1139            r#"{"type":"thread.started","thread_id":"t1"}"#,
1140            make_agent_message_event(&valid_extraction_json()),
1141            make_usage_event(100, 50),
1142        );
1143
1144        let (result, usage) = parse_codex_output(&jsonl).expect("parse must succeed");
1145        assert_eq!(result.name, "test-module");
1146        assert_eq!(result.entities.len(), 1);
1147        assert_eq!(result.relationships.len(), 1);
1148        let u = usage.expect("usage must be present");
1149        assert_eq!(u.input_tokens, 100);
1150        assert_eq!(u.output_tokens, 50);
1151    }
1152
1153    #[test]
1154    fn test_parse_codex_output_turn_failed() {
1155        let jsonl = format!(
1156            "{}\n{}",
1157            r#"{"type":"thread.started","thread_id":"t1"}"#,
1158            r#"{"type":"turn.failed","error":{"message":"model error occurred"}}"#,
1159        );
1160
1161        let err = parse_codex_output(&jsonl).unwrap_err();
1162        let msg = format!("{err}");
1163        assert!(
1164            msg.contains("turn failed"),
1165            "expected 'turn failed' in: {msg}"
1166        );
1167        assert!(msg.contains("model error occurred"));
1168    }
1169
1170    #[test]
1171    fn test_parse_codex_output_rate_limit() {
1172        let jsonl = r#"{"type":"turn.failed","error":{"message":"rate_limit exceeded, 429 Too Many Requests"}}"#;
1173
1174        let err = parse_codex_output(jsonl).unwrap_err();
1175        let msg = format!("{err}");
1176        assert!(
1177            msg.contains("RATE_LIMITED"),
1178            "expected 'RATE_LIMITED' in: {msg}"
1179        );
1180    }
1181
1182    #[test]
1183    fn test_parse_codex_output_schema_error() {
1184        let jsonl = r#"{"type":"error","message":"invalid_json_schema: additional properties not allowed"}"#;
1185
1186        let err = parse_codex_output(jsonl).unwrap_err();
1187        let msg = format!("{err}");
1188        assert!(
1189            msg.contains("invalid_json_schema") || msg.contains("schema"),
1190            "expected schema error in: {msg}"
1191        );
1192    }
1193
1194    #[test]
1195    fn test_extraction_schema_codex_valid_json() {
1196        let _: serde_json::Value =
1197            serde_json::from_str(EXTRACTION_SCHEMA_CODEX).expect("schema must be valid JSON");
1198    }
1199
1200    #[test]
1201    fn test_extraction_schema_codex_has_additional_properties_false() {
1202        let schema: serde_json::Value =
1203            serde_json::from_str(EXTRACTION_SCHEMA_CODEX).expect("schema must be valid JSON");
1204
1205        // Root level
1206        assert_eq!(
1207            schema["additionalProperties"].as_bool(),
1208            Some(false),
1209            "root must have additionalProperties: false"
1210        );
1211
1212        // Entity items level
1213        assert_eq!(
1214            schema["properties"]["entities"]["items"]["additionalProperties"].as_bool(),
1215            Some(false),
1216            "entity items must have additionalProperties: false"
1217        );
1218
1219        // Relationship items level
1220        assert_eq!(
1221            schema["properties"]["relationships"]["items"]["additionalProperties"].as_bool(),
1222            Some(false),
1223            "relationship items must have additionalProperties: false"
1224        );
1225    }
1226
1227    #[test]
1228    fn test_parse_codex_output_last_agent_message_wins() {
1229        // Multiple agent_message items — last one should win
1230        let first_text = r#"{"name":"first-result","description":"First result should be ignored","entities":[],"relationships":[]}"#;
1231        let second_text = r#"{"name":"final-result","description":"Final result wins over earlier ones","entities":[{"name":"final-entity","entity_type":"concept"}],"relationships":[]}"#;
1232
1233        let jsonl = format!(
1234            "{}\n{}\n{}\n{}",
1235            r#"{"type":"thread.started","thread_id":"t1"}"#,
1236            make_agent_message_event(first_text),
1237            make_agent_message_event(second_text),
1238            make_usage_event(200, 80),
1239        );
1240
1241        let (result, _) = parse_codex_output(&jsonl).expect("parse must succeed");
1242        assert_eq!(result.name, "final-result", "last agent_message should win");
1243        assert_eq!(result.entities.len(), 1);
1244    }
1245
1246    #[test]
1247    fn test_parse_codex_output_skips_malformed_lines() {
1248        let jsonl = format!(
1249            "not json at all\n{}\n{{broken\n{}",
1250            make_agent_message_event(&valid_extraction_json()),
1251            make_usage_event(10, 5),
1252        );
1253
1254        // Should succeed despite malformed lines
1255        let (result, _) = parse_codex_output(&jsonl).expect("malformed lines must be skipped");
1256        assert_eq!(result.name, "test-module");
1257    }
1258}