1use crate::commands::ingest::IngestArgs;
13use crate::commands::ingest_claude::ExtractionResult;
14use crate::entity_type::EntityType;
15use crate::errors::AppError;
16use crate::paths::AppPaths;
17use crate::storage::connection::{ensure_db_ready, open_rw};
18use crate::storage::entities::{self, NewEntity, NewRelationship};
19use crate::storage::memories::{self, NewMemory};
20
21use rusqlite::Connection;
22use serde::{Deserialize, Serialize};
23use std::io::Write;
24use std::path::{Path, PathBuf};
25use std::process::{Command, Stdio};
26use std::time::Instant;
27
28const MIN_CODEX_VERSION: &str = "0.120.0";
29
30const EXTRACTION_SCHEMA_CODEX: &str = r#"{
32 "type": "object",
33 "properties": {
34 "name": { "type": "string" },
35 "description": { "type": "string" },
36 "entities": {
37 "type": "array",
38 "items": {
39 "type": "object",
40 "properties": {
41 "name": { "type": "string" },
42 "entity_type": {
43 "type": "string",
44 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
45 }
46 },
47 "required": ["name", "entity_type"],
48 "additionalProperties": false
49 }
50 },
51 "relationships": {
52 "type": "array",
53 "items": {
54 "type": "object",
55 "properties": {
56 "source": { "type": "string" },
57 "target": { "type": "string" },
58 "relation": {
59 "type": "string",
60 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
61 },
62 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
63 },
64 "required": ["source","target","relation","strength"],
65 "additionalProperties": false
66 }
67 }
68 },
69 "required": ["name","description","entities","relationships"],
70 "additionalProperties": false
71}"#;
72
73const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
741. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
752. A one-sentence description (10-20 words) summarizing the key insight\n\
763. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
774. Typed relationships between entities with strength scores\n\n\
78Rules:\n\
79- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
80- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
81- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
82- NEVER use 'mentions' as relationship type\n\
83- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
84- Prefer fewer high-quality entities over many low-quality ones\n\
85- Description must answer: What is this about and WHY does it matter?";
86
87#[derive(Debug, Clone, Deserialize, Serialize)]
89struct CodexUsage {
90 input_tokens: u64,
91 #[serde(default)]
92 cached_input_tokens: u64,
93 output_tokens: u64,
94 #[serde(default)]
95 reasoning_output_tokens: u64,
96}
97
98#[derive(Debug, Serialize)]
99struct PhaseEvent<'a> {
100 phase: &'a str,
101 #[serde(skip_serializing_if = "Option::is_none")]
102 codex_path: Option<&'a str>,
103 #[serde(skip_serializing_if = "Option::is_none")]
104 version: Option<&'a str>,
105 #[serde(skip_serializing_if = "Option::is_none")]
106 dir: Option<&'a str>,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 files_total: Option<usize>,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 files_new: Option<usize>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 files_existing: Option<usize>,
113}
114
115#[derive(Debug, Serialize)]
116struct FileEvent<'a> {
117 file: &'a str,
118 name: &'a str,
119 status: &'a str,
120 #[serde(skip_serializing_if = "Option::is_none")]
121 memory_id: Option<i64>,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 entities: Option<usize>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 rels: Option<usize>,
126 #[serde(skip_serializing_if = "Option::is_none")]
128 cost_usd: Option<f64>,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 input_tokens: Option<u64>,
131 #[serde(skip_serializing_if = "Option::is_none")]
132 output_tokens: Option<u64>,
133 #[serde(skip_serializing_if = "Option::is_none")]
134 elapsed_ms: Option<u64>,
135 #[serde(skip_serializing_if = "Option::is_none")]
136 error: Option<&'a str>,
137 index: usize,
138 total: usize,
139}
140
141#[derive(Debug, Serialize)]
142struct Summary {
143 summary: bool,
144 files_total: usize,
145 completed: usize,
146 failed: usize,
147 skipped: usize,
148 entities_total: usize,
149 rels_total: usize,
150 input_tokens_total: u64,
151 output_tokens_total: u64,
152 elapsed_ms: u64,
153}
154
155pub fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
162 if let Some(p) = explicit {
163 if p.exists() {
164 return Ok(p.to_path_buf());
165 }
166 return Err(AppError::Validation(format!(
167 "Codex CLI binary not found at explicit path: {}",
168 p.display()
169 )));
170 }
171
172 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
173 let p = PathBuf::from(&env_path);
174 if p.exists() {
175 return Ok(p);
176 }
177 }
178
179 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
180 if let Some(path_var) = std::env::var_os("PATH") {
181 for dir in std::env::split_paths(&path_var) {
182 let candidate = dir.join(name);
183 if candidate.exists() {
184 return Ok(candidate);
185 }
186 }
187 }
188
189 Err(AppError::Validation(
190 "Codex CLI binary not found in PATH. Install it from https://github.com/openai/codex or specify --codex-binary".to_string(),
191 ))
192}
193
194fn validate_codex_version(binary: &Path) -> Result<String, AppError> {
201 let resolved = which::which(binary).map_err(|_| {
202 AppError::Validation(format!(
203 "executable '{}' not found in PATH; ensure Codex CLI is installed",
204 binary.display()
205 ))
206 })?;
207 let output = Command::new(&resolved)
208 .arg("--version")
209 .stdin(Stdio::null())
210 .stdout(Stdio::piped())
211 .stderr(Stdio::piped())
212 .output()
213 .map_err(AppError::Io)?;
214
215 let raw = String::from_utf8(output.stdout)
216 .map_err(|_| AppError::Validation("codex --version output is not UTF-8".to_string()))?;
217
218 let version_str = raw.trim().to_string();
219
220 let numeric = version_str.split_whitespace().last().unwrap_or("").trim();
222
223 fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
224 let parts: Vec<&str> = s.splitn(3, '.').collect();
225 if parts.len() < 2 {
226 return None;
227 }
228 let major = parts[0].parse::<u64>().ok()?;
229 let minor = parts[1].parse::<u64>().ok()?;
230 let patch = parts
231 .get(2)
232 .and_then(|p| p.parse::<u64>().ok())
233 .unwrap_or(0);
234 Some((major, minor, patch))
235 }
236
237 if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CODEX_VERSION)) {
238 if actual < min {
239 return Err(AppError::Validation(format!(
240 "Codex CLI version {numeric} is below minimum required {MIN_CODEX_VERSION}"
241 )));
242 }
243 }
244
245 Ok(version_str)
246}
247
248fn write_schema_tempfile() -> Result<tempfile::NamedTempFile, AppError> {
254 let mut f = tempfile::NamedTempFile::new().map_err(AppError::Io)?;
255 std::io::Write::write_all(&mut f, EXTRACTION_SCHEMA_CODEX.as_bytes()).map_err(AppError::Io)?;
256 std::io::Write::flush(&mut f).map_err(AppError::Io)?;
257 Ok(f)
258}
259
260fn extract_with_codex(
271 binary: &Path,
272 file_content: &[u8],
273 model: Option<&str>,
274 timeout_secs: u64,
275 schema_file: &Path,
276) -> Result<(ExtractionResult, Option<CodexUsage>), AppError> {
277 use wait_timeout::ChildExt;
278
279 let _ = timeout_secs; let _ = file_content; let _ = schema_file; let prompt = String::new(); let mut cmd = crate::commands::codex_spawn::build_codex_command(
288 &crate::commands::codex_spawn::CodexSpawnArgs {
289 binary,
290 prompt: &prompt,
291 json_schema: "", input_text: "",
293 model,
294 timeout_secs,
295 schema_path: schema_file.to_path_buf(),
296 },
297 )?;
298
299 let _ = std::fs::write(
306 schema_file,
307 crate::commands::ingest_codex::EXTRACTION_SCHEMA_CODEX,
308 );
309
310 cmd.stdin(Stdio::piped())
311 .stdout(Stdio::piped())
312 .stderr(Stdio::piped());
313
314 let mut child = super::claude_runner::spawn_with_memory_limit(&mut cmd).map_err(|e| {
315 AppError::Io(std::io::Error::new(
316 e.kind(),
317 format!("failed to spawn codex: {e}"),
318 ))
319 })?;
320
321 let file_utf8 = String::from_utf8(file_content.to_vec())
323 .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
324 let stdin_payload = format!("{EXTRACTION_PROMPT}\n\n---\n\nDocument content:\n\n{file_utf8}");
325 let stdin_bytes = stdin_payload.into_bytes();
326
327 let mut child_stdin = child
328 .stdin
329 .take()
330 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
331 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
332 child_stdin.write_all(&stdin_bytes)?;
333 drop(child_stdin);
334 Ok(())
335 });
336
337 let start = std::time::Instant::now();
338 let timeout = std::time::Duration::from_secs(timeout_secs);
339 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
340
341 match status {
342 Some(exit_status) => {
343 stdin_thread
344 .join()
345 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
346 .map_err(AppError::Io)?;
347
348 tracing::debug!(
349 target: "process",
350 exit_code = ?exit_status.code(),
351 elapsed_ms = start.elapsed().as_millis() as u64,
352 "external process completed"
353 );
354
355 let mut stdout_buf = Vec::new();
356 let mut stderr_buf = Vec::new();
357 if let Some(mut out) = child.stdout.take() {
358 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
359 }
360 if let Some(mut err) = child.stderr.take() {
361 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
362 }
363
364 if !exit_status.success() {
365 let stderr_str = String::from_utf8_lossy(&stderr_buf);
366 let stdout_str = String::from_utf8_lossy(&stdout_buf);
367 if let Ok((result, usage)) = parse_codex_output(&stdout_str) {
369 return Ok((result, usage));
370 }
371 if stderr_str.contains("401")
372 || stderr_str.contains("Unauthorized")
373 || stderr_str.contains("auth")
374 {
375 tracing::warn!(
376 target: "ingest",
377 "Codex CLI authentication expired. Re-authenticate with: codex auth login"
378 );
379 }
380 return Err(AppError::Validation(format!(
381 "codex exec exited with code {:?}: {}",
382 exit_status.code(),
383 stderr_str.trim()
384 )));
385 }
386
387 let stdout = String::from_utf8(stdout_buf)
388 .map_err(|_| AppError::Validation("codex exec stdout is not valid UTF-8".into()))?;
389 parse_codex_output(&stdout)
390 }
391 None => {
392 tracing::warn!(target: "ingest", timeout_secs, "codex exec timed out, killing process");
393 let _ = child.kill();
394 let _ = child.wait();
395 let _ = stdin_thread.join();
396 Err(AppError::Validation(format!(
397 "codex exec timed out after {timeout_secs} seconds"
398 )))
399 }
400 }
401}
402
403fn parse_codex_output(stdout: &str) -> Result<(ExtractionResult, Option<CodexUsage>), AppError> {
418 let mut last_agent_text: Option<String> = None;
419 let mut usage: Option<CodexUsage> = None;
420 let mut rate_limited = false;
421 let mut schema_error = false;
422 let mut turn_failed = false;
423 let mut failed_message = String::new();
424
425 for line in stdout.lines() {
426 let line = line.trim();
427 if line.is_empty() {
428 continue;
429 }
430
431 let event: serde_json::Value = match serde_json::from_str(line) {
432 Ok(v) => v,
433 Err(_) => {
434 tracing::warn!(target: "ingest", line, "codex output: skipping malformed JSONL line");
435 continue;
436 }
437 };
438
439 let event_type = match event.get("type").and_then(|t| t.as_str()) {
440 Some(t) => t,
441 None => continue,
442 };
443
444 match event_type {
445 "item.completed" => {
446 if let Some(item) = event.get("item") {
448 if item.get("type").and_then(|t| t.as_str()) == Some("agent_message") {
449 if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
450 last_agent_text = Some(text.to_string());
451 }
452 }
453 }
454 }
455 "turn.completed" => {
456 if let Some(u) = event.get("usage") {
457 if let Ok(parsed) = serde_json::from_value::<CodexUsage>(u.clone()) {
458 usage = Some(parsed);
459 }
460 }
461 }
462 "turn.failed" => {
463 turn_failed = true;
464 if let Some(err) = event.get("error") {
465 let msg = err
466 .get("message")
467 .and_then(|m| m.as_str())
468 .unwrap_or("unknown error");
469 failed_message = msg.to_string();
470 if msg.contains("rate_limit")
471 || msg.contains("429")
472 || msg.contains("Too Many Requests")
473 {
474 rate_limited = true;
475 }
476 }
477 }
478 "error" => {
479 if let Some(msg) = event.get("message").and_then(|m| m.as_str()) {
480 if msg.contains("invalid_json_schema") || msg.contains("schema") {
481 schema_error = true;
482 }
483 tracing::warn!(target: "ingest", error_msg = msg, "codex error event received");
484 }
485 }
486 _ => {
487 }
489 }
490 }
491
492 if rate_limited {
493 return Err(AppError::RateLimited {
494 detail: failed_message,
495 });
496 }
497
498 if schema_error {
499 return Err(AppError::Validation(
500 "codex rejected the output schema (invalid_json_schema)".to_string(),
501 ));
502 }
503
504 if turn_failed {
505 return Err(AppError::Validation(format!(
506 "codex turn failed: {failed_message}"
507 )));
508 }
509
510 let text = last_agent_text.ok_or_else(|| {
511 AppError::Validation("codex output contained no agent_message item".to_string())
512 })?;
513
514 let extraction: ExtractionResult = serde_json::from_str(&text).map_err(|e| {
515 AppError::Validation(format!(
516 "failed to parse codex agent_message as ExtractionResult: {e}. text={text}"
517 ))
518 })?;
519
520 Ok((extraction, usage))
521}
522
523use crate::output::emit_json_line as emit_json;
524
525fn collect_matching_files(
527 dir: &Path,
528 pattern: &str,
529 recursive: bool,
530 max_files: usize,
531) -> Result<Vec<PathBuf>, AppError> {
532 let mut files = Vec::new();
533 super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
534 files.sort_unstable();
535
536 if files.len() > max_files {
537 return Err(AppError::Validation(format!(
538 "found {} files, exceeds --max-files cap of {}",
539 files.len(),
540 max_files
541 )));
542 }
543
544 Ok(files)
545}
546
547fn open_queue_db<P: AsRef<std::path::Path>>(path: P) -> Result<Connection, AppError> {
549 let conn = Connection::open(path)?;
550
551 conn.execute_batch(
552 "PRAGMA journal_mode=WAL;
553 CREATE TABLE IF NOT EXISTS queue (
554 id INTEGER PRIMARY KEY AUTOINCREMENT,
555 file_path TEXT NOT NULL UNIQUE,
556 name TEXT,
557 status TEXT NOT NULL DEFAULT 'pending',
558 memory_id INTEGER,
559 entities INTEGER DEFAULT 0,
560 rels INTEGER DEFAULT 0,
561 error TEXT,
562 input_tokens INTEGER DEFAULT 0,
563 output_tokens INTEGER DEFAULT 0,
564 attempt INTEGER DEFAULT 0,
565 elapsed_ms INTEGER,
566 created_at TEXT DEFAULT (datetime('now')),
567 done_at TEXT
568 );
569 CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
570 )?;
571
572 Ok(conn)
573}
574
575pub fn run_codex_ingest(args: &IngestArgs) -> Result<(), AppError> {
581 let started = Instant::now();
582
583 if !args.dir.exists() {
584 return Err(AppError::Validation(format!(
585 "directory not found: {}",
586 args.dir.display()
587 )));
588 }
589
590 let early_ns = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
595 let early_paths = AppPaths::resolve(args.db.as_deref())?;
596 let queue_path = match args.queue_db.as_deref() {
597 Some(p) => std::path::PathBuf::from(p),
598 None => crate::paths::sidecar_path(&early_paths.db, ".ingest-queue.sqlite"),
599 };
600 let _singleton = crate::lock::acquire_job_singleton(
601 crate::lock::JobType::IngestCodex,
602 &early_ns,
603 &early_paths.db,
604 args.wait_job_singleton,
605 args.force_job_singleton,
606 )?;
607
608 let codex_binary = find_codex_binary(args.codex_binary.as_deref())?;
610 let version = validate_codex_version(&codex_binary)?;
611 tracing::info!(
612 target: "ingest",
613 binary = %codex_binary.display(),
614 version = %version,
615 "Codex CLI binary validated"
616 );
617
618 emit_json(&PhaseEvent {
619 phase: "validate",
620 codex_path: codex_binary.to_str(),
621 version: Some(&version),
622 dir: None,
623 files_total: None,
624 files_new: None,
625 files_existing: None,
626 });
627
628 let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
630
631 let queue_conn = open_queue_db(&queue_path)?;
632
633 if args.resume {
634 let reset = queue_conn
635 .execute(
636 "UPDATE queue SET status='pending' WHERE status='processing'",
637 [],
638 )
639 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
640 if reset > 0 {
641 tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
642 }
643 }
644
645 if args.retry_failed {
646 let count = queue_conn
647 .execute(
648 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
649 [],
650 )
651 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
652 tracing::info!(target: "ingest", count, "retrying failed files");
653 }
654
655 if !args.resume && !args.retry_failed {
656 queue_conn
657 .execute("DELETE FROM queue", [])
658 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
659 }
660
661 let mut new_count = 0usize;
662 let mut existing_count = 0usize;
663
664 if !args.retry_failed {
665 for file in &files {
666 let file_str = file.to_string_lossy().into_owned();
667 let inserted = queue_conn
668 .execute(
669 "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
670 rusqlite::params![file_str],
671 )
672 .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
673 if inserted > 0 {
674 new_count += 1;
675 } else {
676 existing_count += 1;
677 }
678 }
679 }
680
681 emit_json(&PhaseEvent {
682 phase: "scan",
683 codex_path: None,
684 version: None,
685 dir: args.dir.to_str(),
686 files_total: Some(files.len()),
687 files_new: Some(new_count),
688 files_existing: Some(existing_count),
689 });
690
691 if args.dry_run {
692 for (idx, file) in files.iter().enumerate() {
693 let (name, _truncated, _orig) =
694 super::ingest::derive_kebab_name(file, args.max_name_length);
695 emit_json(&FileEvent {
696 file: &file.to_string_lossy(),
697 name: &name,
698 status: "preview",
699 memory_id: None,
700 entities: None,
701 rels: None,
702 cost_usd: None,
703 input_tokens: None,
704 output_tokens: None,
705 elapsed_ms: None,
706 error: None,
707 index: idx,
708 total: files.len(),
709 });
710 }
711 emit_json(&Summary {
712 summary: true,
713 files_total: files.len(),
714 completed: 0,
715 failed: 0,
716 skipped: 0,
717 entities_total: 0,
718 rels_total: 0,
719 input_tokens_total: 0,
720 output_tokens_total: 0,
721 elapsed_ms: started.elapsed().as_millis() as u64,
722 });
723 if !args.keep_queue {
724 let _ = std::fs::remove_file(&queue_path);
725 }
726 return Ok(());
727 }
728
729 let paths = AppPaths::resolve(args.db.as_deref())?;
731 ensure_db_ready(&paths)?;
732 let conn = open_rw(&paths.db)?;
733 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
734 let memory_type_str = args.r#type.as_str().to_string();
735
736 let schema_tempfile = write_schema_tempfile()?;
738 let schema_path = schema_tempfile.path().to_path_buf();
739
740 let mut completed = 0usize;
741 let mut failed = 0usize;
742 let skipped_initial: usize = queue_conn
743 .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
744 r.get::<_, usize>(0)
745 })
746 .unwrap_or(0);
747 let mut skipped = skipped_initial;
748 let mut entities_total = 0usize;
749 let mut rels_total = 0usize;
750 let mut input_tokens_total = 0u64;
751 let mut output_tokens_total = 0u64;
752 let total = files.len();
753
754 let mut backoff_secs = args.rate_limit_wait;
755 let rate_limit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(3600);
756
757 loop {
758 if crate::shutdown_requested() {
759 tracing::info!(target: "ingest", "shutdown requested, stopping before next file");
760 break;
761 }
762
763 let pending: Option<(i64, String)> = queue_conn
764 .query_row(
765 "UPDATE queue SET status='processing', attempt=attempt+1 \
766 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
767 RETURNING id, file_path",
768 [],
769 |row| Ok((row.get(0)?, row.get(1)?)),
770 )
771 .ok();
772
773 let (queue_id, file_path) = match pending {
774 Some(p) => p,
775 None => break,
776 };
777
778 let file_started = Instant::now();
779
780 const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
782 if let Ok(meta) = std::fs::metadata(&file_path) {
783 if meta.len() > MAX_FILE_SIZE {
784 let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
785 let _ = queue_conn.execute(
786 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
787 rusqlite::params![err_msg, queue_id],
788 );
789 let current_index = completed + failed + skipped;
790 failed += 1;
791 emit_json(&FileEvent {
792 file: &file_path,
793 name: "",
794 status: "failed",
795 memory_id: None,
796 entities: None,
797 rels: None,
798 cost_usd: None,
799 input_tokens: None,
800 output_tokens: None,
801 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
802 error: Some(&err_msg),
803 index: current_index,
804 total,
805 });
806 if args.fail_fast {
807 break;
808 }
809 continue;
810 }
811 }
812
813 let file_content = match std::fs::read(&file_path) {
814 Ok(c) => c,
815 Err(e) => {
816 let err_msg = format!("IO error: {e}");
817 let _ = queue_conn.execute(
818 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
819 rusqlite::params![err_msg, queue_id],
820 );
821 let current_index = completed + failed + skipped;
822 failed += 1;
823 emit_json(&FileEvent {
824 file: &file_path,
825 name: "",
826 status: "failed",
827 memory_id: None,
828 entities: None,
829 rels: None,
830 cost_usd: None,
831 input_tokens: None,
832 output_tokens: None,
833 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
834 error: Some(&err_msg),
835 index: current_index,
836 total,
837 });
838 if args.fail_fast {
839 break;
840 }
841 continue;
842 }
843 };
844
845 if file_content.len() > crate::constants::MAX_MEMORY_BODY_LEN {
847 let err_msg = format!(
848 "file body exceeds {} byte limit ({} bytes) — skipping to avoid wasting LLM tokens",
849 crate::constants::MAX_MEMORY_BODY_LEN,
850 file_content.len()
851 );
852 tracing::warn!(target: "ingest", file = %file_path, size = file_content.len(), "body exceeds limit, skipping LLM extraction");
853 let _ = queue_conn.execute(
854 "UPDATE queue SET status='skipped', error=?1, done_at=datetime('now') WHERE id=?2",
855 rusqlite::params![err_msg, queue_id],
856 );
857 let current_index = completed + failed + skipped;
858 skipped += 1;
859 emit_json(&FileEvent {
860 file: &file_path,
861 name: "",
862 status: "skipped",
863 memory_id: None,
864 entities: None,
865 rels: None,
866 cost_usd: None,
867 input_tokens: None,
868 output_tokens: None,
869 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
870 error: Some(&err_msg),
871 index: current_index,
872 total,
873 });
874 continue;
875 }
876
877 let max_extract_attempts: u32 = 2;
879 let mut extraction_result: Option<(ExtractionResult, Option<CodexUsage>)> = None;
880 let mut last_extract_err: Option<String> = None;
881 let mut last_was_rate_limited = false;
882
883 for attempt in 1..=max_extract_attempts {
884 match extract_with_codex(
885 &codex_binary,
886 &file_content,
887 args.codex_model.as_deref(),
888 args.codex_timeout,
889 &schema_path,
890 ) {
891 Ok(result) => {
892 extraction_result = Some(result);
893 break;
894 }
895 Err(ref e) if matches!(e, AppError::RateLimited { .. }) => {
896 last_extract_err = Some(format!("{e}"));
897 last_was_rate_limited = true;
898 break;
899 }
900 Err(e) => {
901 let msg = format!("{e}");
902 if attempt < max_extract_attempts {
903 let cold_start_delay = 2 * attempt as u64;
904 tracing::warn!(
905 target: "ingest",
906 attempt,
907 delay_secs = cold_start_delay,
908 error = %msg,
909 "codex extraction failed, retrying"
910 );
911 std::thread::sleep(std::time::Duration::from_secs(cold_start_delay));
912 }
913 last_extract_err = Some(msg);
914 }
915 }
916 }
917
918 if let Some((extraction, usage)) = extraction_result {
919 backoff_secs = args.rate_limit_wait;
920
921 let in_tok = usage.as_ref().map(|u| u.input_tokens).unwrap_or(0);
922 let out_tok = usage.as_ref().map(|u| u.output_tokens).unwrap_or(0);
923
924 let name = &extraction.name;
925 let ent_count = extraction.entities.len();
926 let rel_count = 0;
927
928 let new_entities: Vec<NewEntity> = extraction
931 .entities
932 .iter()
933 .map(|e| NewEntity {
934 name: e.name.clone(),
935 entity_type: EntityType::map_to_canonical(&e.entity_type),
936 description: None,
937 })
938 .collect();
939
940 let new_relationships: Vec<NewRelationship> = extraction
943 .relationships
944 .iter()
945 .map(|r| NewRelationship {
946 source: r.source.clone(),
947 target: r.target.clone(),
948 relation: crate::parsers::map_to_canonical_relation(&r.relation),
949 strength: r.strength,
950 description: None,
951 })
952 .collect();
953
954 let body_str = String::from_utf8(file_content.clone())
955 .map_err(|e| AppError::Validation(format!("file is not valid UTF-8: {e}")))?;
956 let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
957 let new_memory = NewMemory {
958 name: name.clone(),
959 namespace: namespace.clone(),
960 memory_type: memory_type_str.clone(),
961 description: extraction.description.clone(),
962 body: body_str.to_string(),
963 body_hash,
964 session_id: None,
965 source: "agent".to_string(),
966 metadata: serde_json::Value::Object(serde_json::Map::new()),
967 };
968
969 let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
971 Some((existing_id, is_deleted)) => {
972 if is_deleted {
973 memories::clear_deleted_at(&conn, existing_id)?;
974 }
975 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
976 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
977 rusqlite::params![existing_id],
978 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
979 )?;
980 memories::update(&conn, existing_id, &new_memory, None)?;
981 memories::sync_fts_after_update(
982 &conn,
983 existing_id,
984 &old_name,
985 &old_desc,
986 &old_body,
987 &new_memory.name,
988 &new_memory.description,
989 &new_memory.body,
990 )?;
991 tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
992 existing_id
993 }
994 None => match memories::insert(&conn, &new_memory) {
995 Ok(id) => id,
996 Err(e) => {
997 let err_msg = format!("{e}");
998 let _ = queue_conn.execute(
999 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1000 rusqlite::params![err_msg, queue_id],
1001 );
1002 let current_index = completed + failed + skipped;
1003 failed += 1;
1004 emit_json(&FileEvent {
1005 file: &file_path,
1006 name,
1007 status: "failed",
1008 memory_id: None,
1009 entities: None,
1010 rels: None,
1011 cost_usd: None,
1012 input_tokens: Some(in_tok),
1013 output_tokens: Some(out_tok),
1014 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1015 error: Some(&err_msg),
1016 index: current_index,
1017 total,
1018 });
1019 input_tokens_total += in_tok;
1020 output_tokens_total += out_tok;
1021 if args.fail_fast {
1022 break;
1023 }
1024 continue;
1025 }
1026 },
1027 };
1028
1029 for ent in &new_entities {
1030 if let Ok(eid) = entities::upsert_entity(&conn, &namespace, ent) {
1031 let _ = entities::link_memory_entity(&conn, memory_id, eid);
1032 }
1033 }
1034 for rel in &new_relationships {
1035 crate::parsers::warn_if_non_canonical(&rel.relation);
1036 let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
1037 let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
1038 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1039 let _ = conn.execute(
1040 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
1041 rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
1042 );
1043 }
1044 }
1045
1046 let _ = queue_conn.execute(
1047 "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, \
1048 input_tokens=?5, output_tokens=?6, elapsed_ms=?7, done_at=datetime('now') WHERE id=?8",
1049 rusqlite::params![
1050 name,
1051 memory_id,
1052 ent_count,
1053 rel_count,
1054 in_tok,
1055 out_tok,
1056 file_started.elapsed().as_millis() as i64,
1057 queue_id
1058 ],
1059 );
1060
1061 let current_index = completed + failed + skipped;
1062 completed += 1;
1063 entities_total += ent_count;
1064 rels_total += rel_count;
1065 input_tokens_total += in_tok;
1066 output_tokens_total += out_tok;
1067
1068 emit_json(&FileEvent {
1069 file: &file_path,
1070 name,
1071 status: "done",
1072 memory_id: Some(memory_id),
1073 entities: Some(ent_count),
1074 rels: Some(rel_count),
1075 cost_usd: None,
1076 input_tokens: Some(in_tok),
1077 output_tokens: Some(out_tok),
1078 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1079 error: None,
1080 index: current_index,
1081 total,
1082 });
1083 } else if let Some(ref err_str) = last_extract_err {
1084 if last_was_rate_limited {
1085 if crate::retry::is_kill_switch_active() {
1086 tracing::warn!(target: "ingest", "SQLITE_GRAPHRAG_DISABLE_RETRY=1, skipping rate-limit retry");
1087 } else if std::time::Instant::now() >= rate_limit_deadline {
1088 tracing::error!(target: "ingest", "rate-limit retry deadline (1h) exhausted");
1089 } else {
1090 let half = backoff_secs / 2;
1091 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
1092 let actual_wait = half + jitter;
1093 tracing::warn!(target: "ingest", delay_secs = actual_wait, error_kind = "rate_limited", "Codex rate limited, backing off");
1094 let _ = queue_conn.execute(
1095 "UPDATE queue SET status='pending' WHERE id=?1",
1096 rusqlite::params![queue_id],
1097 );
1098 std::thread::sleep(std::time::Duration::from_secs(actual_wait));
1099 backoff_secs = (backoff_secs * 2).min(900);
1100 continue;
1101 }
1102 } else {
1103 let _ = queue_conn.execute(
1104 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1105 rusqlite::params![err_str, queue_id],
1106 );
1107 let current_index = completed + failed + skipped;
1108 failed += 1;
1109 emit_json(&FileEvent {
1110 file: &file_path,
1111 name: "",
1112 status: "failed",
1113 memory_id: None,
1114 entities: None,
1115 rels: None,
1116 cost_usd: None,
1117 input_tokens: None,
1118 output_tokens: None,
1119 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1120 error: Some(err_str),
1121 index: current_index,
1122 total,
1123 });
1124 if args.fail_fast {
1125 break;
1126 }
1127 }
1128 }
1129 }
1130
1131 let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);");
1133
1134 emit_json(&Summary {
1136 summary: true,
1137 files_total: total,
1138 completed,
1139 failed,
1140 skipped,
1141 entities_total,
1142 rels_total,
1143 input_tokens_total,
1144 output_tokens_total,
1145 elapsed_ms: started.elapsed().as_millis() as u64,
1146 });
1147
1148 if !args.keep_queue && failed == 0 {
1149 let _ = std::fs::remove_file(&queue_path);
1150 }
1151
1152 Ok(())
1153}
1154
1155#[cfg(test)]
1156mod tests {
1157 use super::*;
1158
1159 fn make_agent_message_event(text: &str) -> String {
1160 format!(
1161 r#"{{"type":"item.completed","item":{{"id":"item_0","type":"agent_message","text":{}}}}}"#,
1162 serde_json::to_string(text).unwrap()
1163 )
1164 }
1165
1166 fn make_usage_event(input: u64, output: u64) -> String {
1167 format!(
1168 r#"{{"type":"turn.completed","usage":{{"input_tokens":{input},"output_tokens":{output}}}}}"#
1169 )
1170 }
1171
1172 fn valid_extraction_json() -> String {
1173 r#"{"name":"test-module","description":"A test module for unit testing purposes","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-module","relation":"applies-to","strength":0.8}]}"#.to_string()
1174 }
1175
1176 #[test]
1177 fn test_parse_codex_output_valid() {
1178 let jsonl = format!(
1179 "{}\n{}\n{}",
1180 r#"{"type":"thread.started","thread_id":"t1"}"#,
1181 make_agent_message_event(&valid_extraction_json()),
1182 make_usage_event(100, 50),
1183 );
1184
1185 let (result, usage) = parse_codex_output(&jsonl).expect("parse must succeed");
1186 assert_eq!(result.name, "test-module");
1187 assert_eq!(result.entities.len(), 1);
1188 assert_eq!(result.relationships.len(), 1);
1189 let u = usage.expect("usage must be present");
1190 assert_eq!(u.input_tokens, 100);
1191 assert_eq!(u.output_tokens, 50);
1192 }
1193
1194 #[test]
1195 fn test_parse_codex_output_turn_failed() {
1196 let jsonl = format!(
1197 "{}\n{}",
1198 r#"{"type":"thread.started","thread_id":"t1"}"#,
1199 r#"{"type":"turn.failed","error":{"message":"model error occurred"}}"#,
1200 );
1201
1202 let err = parse_codex_output(&jsonl).unwrap_err();
1203 let msg = format!("{err}");
1204 assert!(
1205 msg.contains("turn failed"),
1206 "expected 'turn failed' in: {msg}"
1207 );
1208 assert!(msg.contains("model error occurred"));
1209 }
1210
1211 #[test]
1212 fn test_parse_codex_output_rate_limit() {
1213 let jsonl = r#"{"type":"turn.failed","error":{"message":"rate_limit exceeded, 429 Too Many Requests"}}"#;
1214
1215 let err = parse_codex_output(jsonl).unwrap_err();
1216 assert!(
1217 matches!(err, AppError::RateLimited { .. }),
1218 "expected AppError::RateLimited, got: {err}"
1219 );
1220 }
1221
1222 #[test]
1223 fn test_parse_codex_output_schema_error() {
1224 let jsonl = r#"{"type":"error","message":"invalid_json_schema: additional properties not allowed"}"#;
1225
1226 let err = parse_codex_output(jsonl).unwrap_err();
1227 let msg = format!("{err}");
1228 assert!(
1229 msg.contains("invalid_json_schema") || msg.contains("schema"),
1230 "expected schema error in: {msg}"
1231 );
1232 }
1233
1234 #[test]
1235 fn test_extraction_schema_codex_valid_json() {
1236 let _: serde_json::Value =
1237 serde_json::from_str(EXTRACTION_SCHEMA_CODEX).expect("schema must be valid JSON");
1238 }
1239
1240 #[test]
1241 fn test_extraction_schema_codex_has_additional_properties_false() {
1242 let schema: serde_json::Value =
1243 serde_json::from_str(EXTRACTION_SCHEMA_CODEX).expect("schema must be valid JSON");
1244
1245 assert_eq!(
1247 schema["additionalProperties"].as_bool(),
1248 Some(false),
1249 "root must have additionalProperties: false"
1250 );
1251
1252 assert_eq!(
1254 schema["properties"]["entities"]["items"]["additionalProperties"].as_bool(),
1255 Some(false),
1256 "entity items must have additionalProperties: false"
1257 );
1258
1259 assert_eq!(
1261 schema["properties"]["relationships"]["items"]["additionalProperties"].as_bool(),
1262 Some(false),
1263 "relationship items must have additionalProperties: false"
1264 );
1265 }
1266
1267 #[test]
1268 fn test_parse_codex_output_last_agent_message_wins() {
1269 let first_text = r#"{"name":"first-result","description":"First result should be ignored","entities":[],"relationships":[]}"#;
1271 let second_text = r#"{"name":"final-result","description":"Final result wins over earlier ones","entities":[{"name":"final-entity","entity_type":"concept"}],"relationships":[]}"#;
1272
1273 let jsonl = format!(
1274 "{}\n{}\n{}\n{}",
1275 r#"{"type":"thread.started","thread_id":"t1"}"#,
1276 make_agent_message_event(first_text),
1277 make_agent_message_event(second_text),
1278 make_usage_event(200, 80),
1279 );
1280
1281 let (result, _) = parse_codex_output(&jsonl).expect("parse must succeed");
1282 assert_eq!(result.name, "final-result", "last agent_message should win");
1283 assert_eq!(result.entities.len(), 1);
1284 }
1285
1286 #[test]
1287 fn test_parse_codex_output_skips_malformed_lines() {
1288 let jsonl = format!(
1289 "not json at all\n{}\n{{broken\n{}",
1290 make_agent_message_event(&valid_extraction_json()),
1291 make_usage_event(10, 5),
1292 );
1293
1294 let (result, _) = parse_codex_output(&jsonl).expect("malformed lines must be skipped");
1296 assert_eq!(result.name, "test-module");
1297 }
1298}