1use crate::commands::ingest::IngestArgs;
13use crate::commands::ingest_claude::ExtractionResult;
14use crate::entity_type::EntityType;
15use crate::errors::AppError;
16use crate::paths::AppPaths;
17use crate::storage::connection::{ensure_db_ready, open_rw};
18use crate::storage::entities::{self, NewEntity, NewRelationship};
19use crate::storage::memories::{self, NewMemory};
20
21use rusqlite::Connection;
22use serde::{Deserialize, Serialize};
23use std::io::Write;
24use std::path::{Path, PathBuf};
25use std::process::{Command, Stdio};
26use std::time::Instant;
27
28const MIN_CODEX_VERSION: &str = "0.120.0";
29
30const EXTRACTION_SCHEMA_CODEX: &str = r#"{
32 "type": "object",
33 "properties": {
34 "name": { "type": "string" },
35 "description": { "type": "string" },
36 "entities": {
37 "type": "array",
38 "items": {
39 "type": "object",
40 "properties": {
41 "name": { "type": "string" },
42 "entity_type": {
43 "type": "string",
44 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
45 }
46 },
47 "required": ["name", "entity_type"],
48 "additionalProperties": false
49 }
50 },
51 "relationships": {
52 "type": "array",
53 "items": {
54 "type": "object",
55 "properties": {
56 "source": { "type": "string" },
57 "target": { "type": "string" },
58 "relation": {
59 "type": "string",
60 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
61 },
62 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
63 },
64 "required": ["source","target","relation","strength"],
65 "additionalProperties": false
66 }
67 }
68 },
69 "required": ["name","description","entities","relationships"],
70 "additionalProperties": false
71}"#;
72
73const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
741. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
752. A one-sentence description (10-20 words) summarizing the key insight\n\
763. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
774. Typed relationships between entities with strength scores\n\n\
78Rules:\n\
79- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
80- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
81- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
82- NEVER use 'mentions' as relationship type\n\
83- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
84- Prefer fewer high-quality entities over many low-quality ones\n\
85- Description must answer: What is this about and WHY does it matter?";
86
87#[derive(Debug, Clone, Deserialize, Serialize)]
89struct CodexUsage {
90 input_tokens: u64,
91 #[serde(default)]
92 cached_input_tokens: u64,
93 output_tokens: u64,
94 #[serde(default)]
95 reasoning_output_tokens: u64,
96}
97
98#[derive(Debug, Serialize)]
99struct PhaseEvent<'a> {
100 phase: &'a str,
101 #[serde(skip_serializing_if = "Option::is_none")]
102 codex_path: Option<&'a str>,
103 #[serde(skip_serializing_if = "Option::is_none")]
104 version: Option<&'a str>,
105 #[serde(skip_serializing_if = "Option::is_none")]
106 dir: Option<&'a str>,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 files_total: Option<usize>,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 files_new: Option<usize>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 files_existing: Option<usize>,
113}
114
115#[derive(Debug, Serialize)]
116struct FileEvent<'a> {
117 file: &'a str,
118 name: &'a str,
119 status: &'a str,
120 #[serde(skip_serializing_if = "Option::is_none")]
121 memory_id: Option<i64>,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 entities: Option<usize>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 rels: Option<usize>,
126 #[serde(skip_serializing_if = "Option::is_none")]
128 cost_usd: Option<f64>,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 input_tokens: Option<u64>,
131 #[serde(skip_serializing_if = "Option::is_none")]
132 output_tokens: Option<u64>,
133 #[serde(skip_serializing_if = "Option::is_none")]
134 elapsed_ms: Option<u64>,
135 #[serde(skip_serializing_if = "Option::is_none")]
136 error: Option<&'a str>,
137 index: usize,
138 total: usize,
139}
140
141#[derive(Debug, Serialize)]
142struct Summary {
143 summary: bool,
144 files_total: usize,
145 completed: usize,
146 failed: usize,
147 skipped: usize,
148 entities_total: usize,
149 rels_total: usize,
150 input_tokens_total: u64,
151 output_tokens_total: u64,
152 elapsed_ms: u64,
153}
154
155pub fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
162 if let Some(p) = explicit {
163 if p.exists() {
164 return Ok(p.to_path_buf());
165 }
166 return Err(AppError::Validation(format!(
167 "Codex CLI binary not found at explicit path: {}",
168 p.display()
169 )));
170 }
171
172 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
173 let p = PathBuf::from(&env_path);
174 if p.exists() {
175 return Ok(p);
176 }
177 }
178
179 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
180 if let Some(path_var) = std::env::var_os("PATH") {
181 for dir in std::env::split_paths(&path_var) {
182 let candidate = dir.join(name);
183 if candidate.exists() {
184 return Ok(candidate);
185 }
186 }
187 }
188
189 Err(AppError::Validation(
190 "Codex CLI binary not found in PATH. Install it from https://github.com/openai/codex or specify --codex-binary".to_string(),
191 ))
192}
193
194fn validate_codex_version(binary: &Path) -> Result<String, AppError> {
201 let resolved = which::which(binary).map_err(|_| {
202 AppError::Validation(format!(
203 "executable '{}' not found in PATH; ensure Codex CLI is installed",
204 binary.display()
205 ))
206 })?;
207 let output = Command::new(&resolved)
208 .arg("--version")
209 .stdin(Stdio::null())
210 .stdout(Stdio::piped())
211 .stderr(Stdio::piped())
212 .output()
213 .map_err(AppError::Io)?;
214
215 let raw = String::from_utf8(output.stdout)
216 .map_err(|_| AppError::Validation("codex --version output is not UTF-8".to_string()))?;
217
218 let version_str = raw.trim().to_string();
219
220 let numeric = version_str.split_whitespace().last().unwrap_or("").trim();
222
223 fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
224 let parts: Vec<&str> = s.splitn(3, '.').collect();
225 if parts.len() < 2 {
226 return None;
227 }
228 let major = parts[0].parse::<u64>().ok()?;
229 let minor = parts[1].parse::<u64>().ok()?;
230 let patch = parts
231 .get(2)
232 .and_then(|p| p.parse::<u64>().ok())
233 .unwrap_or(0);
234 Some((major, minor, patch))
235 }
236
237 if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CODEX_VERSION)) {
238 if actual < min {
239 return Err(AppError::Validation(format!(
240 "Codex CLI version {numeric} is below minimum required {MIN_CODEX_VERSION}"
241 )));
242 }
243 }
244
245 Ok(version_str)
246}
247
248fn write_schema_tempfile() -> Result<tempfile::NamedTempFile, AppError> {
254 let mut f = tempfile::NamedTempFile::new().map_err(AppError::Io)?;
255 std::io::Write::write_all(&mut f, EXTRACTION_SCHEMA_CODEX.as_bytes()).map_err(AppError::Io)?;
256 std::io::Write::flush(&mut f).map_err(AppError::Io)?;
257 Ok(f)
258}
259
260fn extract_with_codex(
271 binary: &Path,
272 file_content: &[u8],
273 model: Option<&str>,
274 timeout_secs: u64,
275 schema_file: &Path,
276) -> Result<(ExtractionResult, Option<CodexUsage>), AppError> {
277 use wait_timeout::ChildExt;
278
279 let mut cmd = Command::new(binary);
280
281 cmd.env_clear();
282 for var in &[
283 "PATH",
284 "HOME",
285 "USER",
286 "SHELL",
287 "TERM",
288 "LANG",
289 "XDG_CONFIG_HOME",
290 "XDG_DATA_HOME",
291 "XDG_RUNTIME_DIR",
292 "XDG_CACHE_HOME",
293 "OPENAI_API_KEY",
294 "CODEX_ACCESS_TOKEN",
295 "CODEX_HOME",
296 "TMPDIR",
297 "TMP",
298 "TEMP",
299 "DYLD_FALLBACK_LIBRARY_PATH",
300 ] {
301 if let Ok(val) = std::env::var(var) {
302 cmd.env(var, val);
303 }
304 }
305
306 #[cfg(windows)]
307 for var in &[
308 "LOCALAPPDATA",
309 "APPDATA",
310 "USERPROFILE",
311 "SystemRoot",
312 "COMSPEC",
313 "PATHEXT",
314 ] {
315 if let Ok(val) = std::env::var(var) {
316 cmd.env(var, val);
317 }
318 }
319
320 cmd.arg("exec")
321 .arg("--json")
322 .arg("--output-schema")
323 .arg(schema_file)
324 .arg("--ephemeral")
325 .arg("--skip-git-repo-check")
326 .arg("--sandbox")
327 .arg("read-only")
328 .arg("--ignore-user-config")
329 .arg("--ignore-rules");
330
331 if let Some(m) = model {
332 cmd.arg("-m").arg(m);
333 }
334
335 cmd.arg("-");
337
338 cmd.stdin(Stdio::piped())
339 .stdout(Stdio::piped())
340 .stderr(Stdio::piped());
341
342 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
343 AppError::Io(std::io::Error::new(
344 e.kind(),
345 format!("failed to spawn codex: {e}"),
346 ))
347 })?;
348
349 let file_utf8 = String::from_utf8_lossy(file_content);
351 let stdin_payload = format!("{EXTRACTION_PROMPT}\n\n---\n\nDocument content:\n\n{file_utf8}");
352 let stdin_bytes = stdin_payload.into_bytes();
353
354 let mut child_stdin = child
355 .stdin
356 .take()
357 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
358 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
359 child_stdin.write_all(&stdin_bytes)?;
360 drop(child_stdin);
361 Ok(())
362 });
363
364 let start = std::time::Instant::now();
365 let timeout = std::time::Duration::from_secs(timeout_secs);
366 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
367
368 match status {
369 Some(exit_status) => {
370 stdin_thread
371 .join()
372 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
373 .map_err(AppError::Io)?;
374
375 tracing::debug!(
376 target: "process",
377 exit_code = ?exit_status.code(),
378 elapsed_ms = start.elapsed().as_millis() as u64,
379 "external process completed"
380 );
381
382 let mut stdout_buf = Vec::new();
383 let mut stderr_buf = Vec::new();
384 if let Some(mut out) = child.stdout.take() {
385 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
386 }
387 if let Some(mut err) = child.stderr.take() {
388 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
389 }
390
391 if !exit_status.success() {
392 let stderr_str = String::from_utf8_lossy(&stderr_buf);
393 let stdout_str = String::from_utf8_lossy(&stdout_buf);
394 if let Ok((result, usage)) = parse_codex_output(&stdout_str) {
396 return Ok((result, usage));
397 }
398 if stderr_str.contains("401")
399 || stderr_str.contains("Unauthorized")
400 || stderr_str.contains("auth")
401 {
402 tracing::warn!(
403 target: "ingest",
404 "Codex CLI authentication expired. Re-authenticate with: codex auth login"
405 );
406 }
407 return Err(AppError::Validation(format!(
408 "codex exec exited with code {:?}: {}",
409 exit_status.code(),
410 stderr_str.trim()
411 )));
412 }
413
414 let stdout = String::from_utf8(stdout_buf)
415 .map_err(|_| AppError::Validation("codex exec stdout is not valid UTF-8".into()))?;
416 parse_codex_output(&stdout)
417 }
418 None => {
419 tracing::warn!(target: "ingest", timeout_secs, "codex exec timed out, killing process");
420 let _ = child.kill();
421 let _ = child.wait();
422 let _ = stdin_thread.join();
423 Err(AppError::Validation(format!(
424 "codex exec timed out after {timeout_secs} seconds"
425 )))
426 }
427 }
428}
429
430fn parse_codex_output(stdout: &str) -> Result<(ExtractionResult, Option<CodexUsage>), AppError> {
445 let mut last_agent_text: Option<String> = None;
446 let mut usage: Option<CodexUsage> = None;
447 let mut rate_limited = false;
448 let mut schema_error = false;
449 let mut turn_failed = false;
450 let mut failed_message = String::new();
451
452 for line in stdout.lines() {
453 let line = line.trim();
454 if line.is_empty() {
455 continue;
456 }
457
458 let event: serde_json::Value = match serde_json::from_str(line) {
459 Ok(v) => v,
460 Err(_) => {
461 tracing::warn!(target: "ingest", line, "codex output: skipping malformed JSONL line");
462 continue;
463 }
464 };
465
466 let event_type = match event.get("type").and_then(|t| t.as_str()) {
467 Some(t) => t,
468 None => continue,
469 };
470
471 match event_type {
472 "item.completed" => {
473 if let Some(item) = event.get("item") {
475 if item.get("type").and_then(|t| t.as_str()) == Some("agent_message") {
476 if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
477 last_agent_text = Some(text.to_string());
478 }
479 }
480 }
481 }
482 "turn.completed" => {
483 if let Some(u) = event.get("usage") {
484 if let Ok(parsed) = serde_json::from_value::<CodexUsage>(u.clone()) {
485 usage = Some(parsed);
486 }
487 }
488 }
489 "turn.failed" => {
490 turn_failed = true;
491 if let Some(err) = event.get("error") {
492 let msg = err
493 .get("message")
494 .and_then(|m| m.as_str())
495 .unwrap_or("unknown error");
496 failed_message = msg.to_string();
497 if msg.contains("rate_limit")
498 || msg.contains("429")
499 || msg.contains("Too Many Requests")
500 {
501 rate_limited = true;
502 }
503 }
504 }
505 "error" => {
506 if let Some(msg) = event.get("message").and_then(|m| m.as_str()) {
507 if msg.contains("invalid_json_schema") || msg.contains("schema") {
508 schema_error = true;
509 }
510 tracing::warn!(target: "ingest", error_msg = msg, "codex error event received");
511 }
512 }
513 _ => {
514 }
516 }
517 }
518
519 if rate_limited {
520 return Err(AppError::RateLimited {
521 detail: failed_message,
522 });
523 }
524
525 if schema_error {
526 return Err(AppError::Validation(
527 "codex rejected the output schema (invalid_json_schema)".to_string(),
528 ));
529 }
530
531 if turn_failed {
532 return Err(AppError::Validation(format!(
533 "codex turn failed: {failed_message}"
534 )));
535 }
536
537 let text = last_agent_text.ok_or_else(|| {
538 AppError::Validation("codex output contained no agent_message item".to_string())
539 })?;
540
541 let extraction: ExtractionResult = serde_json::from_str(&text).map_err(|e| {
542 AppError::Validation(format!(
543 "failed to parse codex agent_message as ExtractionResult: {e}. text={text}"
544 ))
545 })?;
546
547 Ok((extraction, usage))
548}
549
550use crate::output::emit_json_line as emit_json;
551
552fn collect_matching_files(
554 dir: &Path,
555 pattern: &str,
556 recursive: bool,
557 max_files: usize,
558) -> Result<Vec<PathBuf>, AppError> {
559 let mut files = Vec::new();
560 super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
561 files.sort_unstable();
562
563 if files.len() > max_files {
564 return Err(AppError::Validation(format!(
565 "found {} files, exceeds --max-files cap of {}",
566 files.len(),
567 max_files
568 )));
569 }
570
571 Ok(files)
572}
573
574fn open_queue_db(path: &str) -> Result<Connection, AppError> {
576 let conn = Connection::open(path)?;
577
578 conn.execute_batch(
579 "PRAGMA journal_mode=WAL;
580 CREATE TABLE IF NOT EXISTS queue (
581 id INTEGER PRIMARY KEY AUTOINCREMENT,
582 file_path TEXT NOT NULL UNIQUE,
583 name TEXT,
584 status TEXT NOT NULL DEFAULT 'pending',
585 memory_id INTEGER,
586 entities INTEGER DEFAULT 0,
587 rels INTEGER DEFAULT 0,
588 error TEXT,
589 input_tokens INTEGER DEFAULT 0,
590 output_tokens INTEGER DEFAULT 0,
591 attempt INTEGER DEFAULT 0,
592 elapsed_ms INTEGER,
593 created_at TEXT DEFAULT (datetime('now')),
594 done_at TEXT
595 );
596 CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
597 )?;
598
599 Ok(conn)
600}
601
602pub fn run_codex_ingest(args: &IngestArgs) -> Result<(), AppError> {
608 let started = Instant::now();
609
610 if !args.dir.exists() {
611 return Err(AppError::Validation(format!(
612 "directory not found: {}",
613 args.dir.display()
614 )));
615 }
616
617 let codex_binary = find_codex_binary(args.codex_binary.as_deref())?;
619 let version = validate_codex_version(&codex_binary)?;
620 tracing::info!(
621 target: "ingest",
622 binary = %codex_binary.display(),
623 version = %version,
624 "Codex CLI binary validated"
625 );
626
627 emit_json(&PhaseEvent {
628 phase: "validate",
629 codex_path: codex_binary.to_str(),
630 version: Some(&version),
631 dir: None,
632 files_total: None,
633 files_new: None,
634 files_existing: None,
635 });
636
637 let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
639
640 let queue_conn = open_queue_db(&args.queue_db)?;
641
642 if args.resume {
643 let reset = queue_conn
644 .execute(
645 "UPDATE queue SET status='pending' WHERE status='processing'",
646 [],
647 )
648 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
649 if reset > 0 {
650 tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
651 }
652 }
653
654 if args.retry_failed {
655 let count = queue_conn
656 .execute(
657 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
658 [],
659 )
660 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
661 tracing::info!(target: "ingest", count, "retrying failed files");
662 }
663
664 if !args.resume && !args.retry_failed {
665 queue_conn
666 .execute("DELETE FROM queue", [])
667 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
668 }
669
670 let mut new_count = 0usize;
671 let mut existing_count = 0usize;
672
673 if !args.retry_failed {
674 for file in &files {
675 let file_str = file.to_string_lossy().into_owned();
676 let inserted = queue_conn
677 .execute(
678 "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
679 rusqlite::params![file_str],
680 )
681 .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
682 if inserted > 0 {
683 new_count += 1;
684 } else {
685 existing_count += 1;
686 }
687 }
688 }
689
690 emit_json(&PhaseEvent {
691 phase: "scan",
692 codex_path: None,
693 version: None,
694 dir: args.dir.to_str(),
695 files_total: Some(files.len()),
696 files_new: Some(new_count),
697 files_existing: Some(existing_count),
698 });
699
700 if args.dry_run {
701 for (idx, file) in files.iter().enumerate() {
702 let (name, _truncated, _orig) =
703 super::ingest::derive_kebab_name(file, args.max_name_length);
704 emit_json(&FileEvent {
705 file: &file.to_string_lossy(),
706 name: &name,
707 status: "preview",
708 memory_id: None,
709 entities: None,
710 rels: None,
711 cost_usd: None,
712 input_tokens: None,
713 output_tokens: None,
714 elapsed_ms: None,
715 error: None,
716 index: idx,
717 total: files.len(),
718 });
719 }
720 emit_json(&Summary {
721 summary: true,
722 files_total: files.len(),
723 completed: 0,
724 failed: 0,
725 skipped: 0,
726 entities_total: 0,
727 rels_total: 0,
728 input_tokens_total: 0,
729 output_tokens_total: 0,
730 elapsed_ms: started.elapsed().as_millis() as u64,
731 });
732 if !args.keep_queue {
733 let _ = std::fs::remove_file(&args.queue_db);
734 }
735 return Ok(());
736 }
737
738 let paths = AppPaths::resolve(args.db.as_deref())?;
740 ensure_db_ready(&paths)?;
741 let conn = open_rw(&paths.db)?;
742 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
743 let memory_type_str = args.r#type.as_str().to_string();
744
745 let schema_tempfile = write_schema_tempfile()?;
747 let schema_path = schema_tempfile.path().to_path_buf();
748
749 let mut completed = 0usize;
750 let mut failed = 0usize;
751 let skipped_initial: usize = queue_conn
752 .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
753 r.get::<_, usize>(0)
754 })
755 .unwrap_or(0);
756 let mut skipped = skipped_initial;
757 let mut entities_total = 0usize;
758 let mut rels_total = 0usize;
759 let mut input_tokens_total = 0u64;
760 let mut output_tokens_total = 0u64;
761 let total = files.len();
762
763 let mut backoff_secs = args.rate_limit_wait;
764 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
765
766 loop {
767 if crate::shutdown_requested() {
768 tracing::info!(target: "ingest", "shutdown requested, stopping before next file");
769 break;
770 }
771
772 let pending: Option<(i64, String)> = queue_conn
773 .query_row(
774 "UPDATE queue SET status='processing', attempt=attempt+1 \
775 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
776 RETURNING id, file_path",
777 [],
778 |row| Ok((row.get(0)?, row.get(1)?)),
779 )
780 .ok();
781
782 let (queue_id, file_path) = match pending {
783 Some(p) => p,
784 None => break,
785 };
786
787 let file_started = Instant::now();
788
789 const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
791 if let Ok(meta) = std::fs::metadata(&file_path) {
792 if meta.len() > MAX_FILE_SIZE {
793 let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
794 let _ = queue_conn.execute(
795 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
796 rusqlite::params![err_msg, queue_id],
797 );
798 let current_index = completed + failed + skipped;
799 failed += 1;
800 emit_json(&FileEvent {
801 file: &file_path,
802 name: "",
803 status: "failed",
804 memory_id: None,
805 entities: None,
806 rels: None,
807 cost_usd: None,
808 input_tokens: None,
809 output_tokens: None,
810 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
811 error: Some(&err_msg),
812 index: current_index,
813 total,
814 });
815 if args.fail_fast {
816 break;
817 }
818 continue;
819 }
820 }
821
822 let file_content = match std::fs::read(&file_path) {
823 Ok(c) => c,
824 Err(e) => {
825 let err_msg = format!("IO error: {e}");
826 let _ = queue_conn.execute(
827 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
828 rusqlite::params![err_msg, queue_id],
829 );
830 let current_index = completed + failed + skipped;
831 failed += 1;
832 emit_json(&FileEvent {
833 file: &file_path,
834 name: "",
835 status: "failed",
836 memory_id: None,
837 entities: None,
838 rels: None,
839 cost_usd: None,
840 input_tokens: None,
841 output_tokens: None,
842 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
843 error: Some(&err_msg),
844 index: current_index,
845 total,
846 });
847 if args.fail_fast {
848 break;
849 }
850 continue;
851 }
852 };
853
854 if file_content.len() > crate::constants::MAX_MEMORY_BODY_LEN {
856 let err_msg = format!(
857 "file body exceeds {} byte limit ({} bytes) — skipping to avoid wasting LLM tokens",
858 crate::constants::MAX_MEMORY_BODY_LEN,
859 file_content.len()
860 );
861 tracing::warn!(target: "ingest", file = %file_path, size = file_content.len(), "body exceeds limit, skipping LLM extraction");
862 let _ = queue_conn.execute(
863 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
864 rusqlite::params![err_msg, queue_id],
865 );
866 let current_index = completed + failed + skipped;
867 skipped += 1;
868 emit_json(&FileEvent {
869 file: &file_path,
870 name: "",
871 status: "skipped",
872 memory_id: None,
873 entities: None,
874 rels: None,
875 cost_usd: None,
876 input_tokens: None,
877 output_tokens: None,
878 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
879 error: Some(&err_msg),
880 index: current_index,
881 total,
882 });
883 continue;
884 }
885
886 let max_extract_attempts: u32 = 2;
888 let mut extraction_result: Option<(ExtractionResult, Option<CodexUsage>)> = None;
889 let mut last_extract_err: Option<String> = None;
890 let mut last_was_rate_limited = false;
891
892 for attempt in 1..=max_extract_attempts {
893 match extract_with_codex(
894 &codex_binary,
895 &file_content,
896 args.codex_model.as_deref(),
897 args.codex_timeout,
898 &schema_path,
899 ) {
900 Ok(result) => {
901 extraction_result = Some(result);
902 break;
903 }
904 Err(ref e) if matches!(e, AppError::RateLimited { .. }) => {
905 last_extract_err = Some(format!("{e}"));
906 last_was_rate_limited = true;
907 break;
908 }
909 Err(e) => {
910 let msg = format!("{e}");
911 if attempt < max_extract_attempts {
912 let cold_start_delay = 2 * attempt as u64;
913 tracing::warn!(
914 target: "ingest",
915 attempt,
916 delay_secs = cold_start_delay,
917 error = %msg,
918 "codex extraction failed, retrying"
919 );
920 std::thread::sleep(std::time::Duration::from_secs(cold_start_delay));
921 }
922 last_extract_err = Some(msg);
923 }
924 }
925 }
926
927 if let Some((extraction, usage)) = extraction_result {
928 backoff_secs = args.rate_limit_wait;
929
930 let in_tok = usage.as_ref().map(|u| u.input_tokens).unwrap_or(0);
931 let out_tok = usage.as_ref().map(|u| u.output_tokens).unwrap_or(0);
932
933 let name = &extraction.name;
934 let ent_count = extraction.entities.len();
935 let rel_count = extraction.relationships.len();
936
937 let new_entities: Vec<NewEntity> = extraction
938 .entities
939 .iter()
940 .filter_map(|e| match e.entity_type.parse::<EntityType>() {
941 Ok(et) => Some(NewEntity {
942 name: e.name.clone(),
943 entity_type: et,
944 description: None,
945 }),
946 Err(_) => {
947 tracing::warn!(
948 target: "ingest",
949 entity = %e.name,
950 entity_type = %e.entity_type,
951 "entity type not recognized, skipping"
952 );
953 None
954 }
955 })
956 .collect();
957
958 let new_relationships: Vec<NewRelationship> = extraction
959 .relationships
960 .iter()
961 .map(|r| NewRelationship {
962 source: r.source.clone(),
963 target: r.target.clone(),
964 relation: crate::parsers::normalize_relation(&r.relation),
965 strength: r.strength,
966 description: None,
967 })
968 .collect();
969
970 let body_str = String::from_utf8_lossy(&file_content);
971 let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
972 let new_memory = NewMemory {
973 name: name.clone(),
974 namespace: namespace.clone(),
975 memory_type: memory_type_str.clone(),
976 description: extraction.description.clone(),
977 body: body_str.to_string(),
978 body_hash,
979 session_id: None,
980 source: "agent".to_string(),
981 metadata: serde_json::Value::Object(serde_json::Map::new()),
982 };
983
984 let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
986 Some((existing_id, is_deleted)) => {
987 if is_deleted {
988 memories::clear_deleted_at(&conn, existing_id)?;
989 }
990 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
991 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
992 rusqlite::params![existing_id],
993 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
994 )?;
995 memories::update(&conn, existing_id, &new_memory, None)?;
996 memories::sync_fts_after_update(
997 &conn,
998 existing_id,
999 &old_name,
1000 &old_desc,
1001 &old_body,
1002 &new_memory.name,
1003 &new_memory.description,
1004 &new_memory.body,
1005 )?;
1006 tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
1007 existing_id
1008 }
1009 None => match memories::insert(&conn, &new_memory) {
1010 Ok(id) => id,
1011 Err(e) => {
1012 let err_msg = format!("{e}");
1013 let _ = queue_conn.execute(
1014 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1015 rusqlite::params![err_msg, queue_id],
1016 );
1017 let current_index = completed + failed + skipped;
1018 failed += 1;
1019 emit_json(&FileEvent {
1020 file: &file_path,
1021 name,
1022 status: "failed",
1023 memory_id: None,
1024 entities: None,
1025 rels: None,
1026 cost_usd: None,
1027 input_tokens: Some(in_tok),
1028 output_tokens: Some(out_tok),
1029 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1030 error: Some(&err_msg),
1031 index: current_index,
1032 total,
1033 });
1034 input_tokens_total += in_tok;
1035 output_tokens_total += out_tok;
1036 if args.fail_fast {
1037 break;
1038 }
1039 continue;
1040 }
1041 },
1042 };
1043
1044 for ent in &new_entities {
1045 if let Ok(eid) = entities::upsert_entity(&conn, &namespace, ent) {
1046 let _ = entities::link_memory_entity(&conn, memory_id, eid);
1047 }
1048 }
1049 for rel in &new_relationships {
1050 crate::parsers::warn_if_non_canonical(&rel.relation);
1051 let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
1052 let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
1053 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1054 let _ = conn.execute(
1055 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
1056 rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
1057 );
1058 }
1059 }
1060
1061 let _ = queue_conn.execute(
1062 "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, \
1063 input_tokens=?5, output_tokens=?6, elapsed_ms=?7, done_at=datetime('now') WHERE id=?8",
1064 rusqlite::params![
1065 name,
1066 memory_id,
1067 ent_count,
1068 rel_count,
1069 in_tok,
1070 out_tok,
1071 file_started.elapsed().as_millis() as i64,
1072 queue_id
1073 ],
1074 );
1075
1076 let current_index = completed + failed + skipped;
1077 completed += 1;
1078 entities_total += ent_count;
1079 rels_total += rel_count;
1080 input_tokens_total += in_tok;
1081 output_tokens_total += out_tok;
1082
1083 emit_json(&FileEvent {
1084 file: &file_path,
1085 name,
1086 status: "done",
1087 memory_id: Some(memory_id),
1088 entities: Some(ent_count),
1089 rels: Some(rel_count),
1090 cost_usd: None,
1091 input_tokens: Some(in_tok),
1092 output_tokens: Some(out_tok),
1093 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1094 error: None,
1095 index: current_index,
1096 total,
1097 });
1098 } else if let Some(ref err_str) = last_extract_err {
1099 if last_was_rate_limited {
1100 if crate::retry::is_kill_switch_active() {
1101 tracing::warn!(target: "ingest", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1102 } else if std::time::Instant::now() >= rate_limit_deadline {
1103 tracing::error!(target: "ingest", "rate-limit retry deadline (1h) exhausted");
1104 } else {
1105 let half = backoff_secs / 2;
1106 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1107 let actual_wait = half + jitter;
1108 tracing::warn!(target: "ingest", delay_secs = actual_wait, error_kind = "rate_limited", "Codex rate limited, backing off");
1109 let _ = queue_conn.execute(
1110 "UPDATE queue SET status='pending' WHERE id=?1",
1111 rusqlite::params![queue_id],
1112 );
1113 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1114 backoff_secs = (backoff_secs * 2).min(900);
1115 continue;
1116 }
1117 } else {
1118 let _ = queue_conn.execute(
1119 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1120 rusqlite::params![err_str, queue_id],
1121 );
1122 let current_index = completed + failed + skipped;
1123 failed += 1;
1124 emit_json(&FileEvent {
1125 file: &file_path,
1126 name: "",
1127 status: "failed",
1128 memory_id: None,
1129 entities: None,
1130 rels: None,
1131 cost_usd: None,
1132 input_tokens: None,
1133 output_tokens: None,
1134 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1135 error: Some(err_str),
1136 index: current_index,
1137 total,
1138 });
1139 if args.fail_fast {
1140 break;
1141 }
1142 }
1143 }
1144 }
1145
1146 let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);");
1148
1149 emit_json(&Summary {
1151 summary: true,
1152 files_total: total,
1153 completed,
1154 failed,
1155 skipped,
1156 entities_total,
1157 rels_total,
1158 input_tokens_total,
1159 output_tokens_total,
1160 elapsed_ms: started.elapsed().as_millis() as u64,
1161 });
1162
1163 if !args.keep_queue && failed == 0 {
1164 let _ = std::fs::remove_file(&args.queue_db);
1165 }
1166
1167 Ok(())
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use super::*;
1173
1174 fn make_agent_message_event(text: &str) -> String {
1175 format!(
1176 r#"{{"type":"item.completed","item":{{"id":"item_0","type":"agent_message","text":{}}}}}"#,
1177 serde_json::to_string(text).unwrap()
1178 )
1179 }
1180
1181 fn make_usage_event(input: u64, output: u64) -> String {
1182 format!(
1183 r#"{{"type":"turn.completed","usage":{{"input_tokens":{input},"output_tokens":{output}}}}}"#
1184 )
1185 }
1186
1187 fn valid_extraction_json() -> String {
1188 r#"{"name":"test-module","description":"A test module for unit testing purposes","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-module","relation":"applies-to","strength":0.8}]}"#.to_string()
1189 }
1190
1191 #[test]
1192 fn test_parse_codex_output_valid() {
1193 let jsonl = format!(
1194 "{}\n{}\n{}",
1195 r#"{"type":"thread.started","thread_id":"t1"}"#,
1196 make_agent_message_event(&valid_extraction_json()),
1197 make_usage_event(100, 50),
1198 );
1199
1200 let (result, usage) = parse_codex_output(&jsonl).expect("parse must succeed");
1201 assert_eq!(result.name, "test-module");
1202 assert_eq!(result.entities.len(), 1);
1203 assert_eq!(result.relationships.len(), 1);
1204 let u = usage.expect("usage must be present");
1205 assert_eq!(u.input_tokens, 100);
1206 assert_eq!(u.output_tokens, 50);
1207 }
1208
1209 #[test]
1210 fn test_parse_codex_output_turn_failed() {
1211 let jsonl = format!(
1212 "{}\n{}",
1213 r#"{"type":"thread.started","thread_id":"t1"}"#,
1214 r#"{"type":"turn.failed","error":{"message":"model error occurred"}}"#,
1215 );
1216
1217 let err = parse_codex_output(&jsonl).unwrap_err();
1218 let msg = format!("{err}");
1219 assert!(
1220 msg.contains("turn failed"),
1221 "expected 'turn failed' in: {msg}"
1222 );
1223 assert!(msg.contains("model error occurred"));
1224 }
1225
1226 #[test]
1227 fn test_parse_codex_output_rate_limit() {
1228 let jsonl = r#"{"type":"turn.failed","error":{"message":"rate_limit exceeded, 429 Too Many Requests"}}"#;
1229
1230 let err = parse_codex_output(jsonl).unwrap_err();
1231 assert!(
1232 matches!(err, AppError::RateLimited { .. }),
1233 "expected AppError::RateLimited, got: {err}"
1234 );
1235 }
1236
1237 #[test]
1238 fn test_parse_codex_output_schema_error() {
1239 let jsonl = r#"{"type":"error","message":"invalid_json_schema: additional properties not allowed"}"#;
1240
1241 let err = parse_codex_output(jsonl).unwrap_err();
1242 let msg = format!("{err}");
1243 assert!(
1244 msg.contains("invalid_json_schema") || msg.contains("schema"),
1245 "expected schema error in: {msg}"
1246 );
1247 }
1248
1249 #[test]
1250 fn test_extraction_schema_codex_valid_json() {
1251 let _: serde_json::Value =
1252 serde_json::from_str(EXTRACTION_SCHEMA_CODEX).expect("schema must be valid JSON");
1253 }
1254
1255 #[test]
1256 fn test_extraction_schema_codex_has_additional_properties_false() {
1257 let schema: serde_json::Value =
1258 serde_json::from_str(EXTRACTION_SCHEMA_CODEX).expect("schema must be valid JSON");
1259
1260 assert_eq!(
1262 schema["additionalProperties"].as_bool(),
1263 Some(false),
1264 "root must have additionalProperties: false"
1265 );
1266
1267 assert_eq!(
1269 schema["properties"]["entities"]["items"]["additionalProperties"].as_bool(),
1270 Some(false),
1271 "entity items must have additionalProperties: false"
1272 );
1273
1274 assert_eq!(
1276 schema["properties"]["relationships"]["items"]["additionalProperties"].as_bool(),
1277 Some(false),
1278 "relationship items must have additionalProperties: false"
1279 );
1280 }
1281
1282 #[test]
1283 fn test_parse_codex_output_last_agent_message_wins() {
1284 let first_text = r#"{"name":"first-result","description":"First result should be ignored","entities":[],"relationships":[]}"#;
1286 let second_text = r#"{"name":"final-result","description":"Final result wins over earlier ones","entities":[{"name":"final-entity","entity_type":"concept"}],"relationships":[]}"#;
1287
1288 let jsonl = format!(
1289 "{}\n{}\n{}\n{}",
1290 r#"{"type":"thread.started","thread_id":"t1"}"#,
1291 make_agent_message_event(first_text),
1292 make_agent_message_event(second_text),
1293 make_usage_event(200, 80),
1294 );
1295
1296 let (result, _) = parse_codex_output(&jsonl).expect("parse must succeed");
1297 assert_eq!(result.name, "final-result", "last agent_message should win");
1298 assert_eq!(result.entities.len(), 1);
1299 }
1300
1301 #[test]
1302 fn test_parse_codex_output_skips_malformed_lines() {
1303 let jsonl = format!(
1304 "not json at all\n{}\n{{broken\n{}",
1305 make_agent_message_event(&valid_extraction_json()),
1306 make_usage_event(10, 5),
1307 );
1308
1309 let (result, _) = parse_codex_output(&jsonl).expect("malformed lines must be skipped");
1311 assert_eq!(result.name, "test-module");
1312 }
1313}