1use crate::commands::ingest::IngestArgs;
12use crate::commands::ingest_claude::ExtractionResult;
13use crate::entity_type::EntityType;
14use crate::errors::AppError;
15use crate::paths::AppPaths;
16use crate::storage::connection::{ensure_db_ready, open_rw};
17use crate::storage::entities::{self, NewEntity, NewRelationship};
18use crate::storage::memories::{self, NewMemory};
19
20use rusqlite::Connection;
21use serde::{Deserialize, Serialize};
22use std::io::Write;
23use std::path::{Path, PathBuf};
24use std::process::{Command, Stdio};
25use std::time::Instant;
26
27const MIN_CODEX_VERSION: &str = "0.120.0";
28
29const EXTRACTION_SCHEMA_CODEX: &str = r#"{
31 "type": "object",
32 "properties": {
33 "name": { "type": "string" },
34 "description": { "type": "string" },
35 "entities": {
36 "type": "array",
37 "items": {
38 "type": "object",
39 "properties": {
40 "name": { "type": "string" },
41 "entity_type": {
42 "type": "string",
43 "enum": ["project","tool","person","file","concept","incident","decision","organization","location","date"]
44 }
45 },
46 "required": ["name", "entity_type"],
47 "additionalProperties": false
48 }
49 },
50 "relationships": {
51 "type": "array",
52 "items": {
53 "type": "object",
54 "properties": {
55 "source": { "type": "string" },
56 "target": { "type": "string" },
57 "relation": {
58 "type": "string",
59 "enum": ["applies-to","uses","depends-on","causes","fixes","contradicts","supports","follows","related","replaces","tracked-in"]
60 },
61 "strength": { "type": "number", "minimum": 0, "maximum": 1 }
62 },
63 "required": ["source","target","relation","strength"],
64 "additionalProperties": false
65 }
66 }
67 },
68 "required": ["name","description","entities","relationships"],
69 "additionalProperties": false
70}"#;
71
72const EXTRACTION_PROMPT: &str = "You are a knowledge graph entity extractor. Given a document, extract:\n\
731. A short kebab-case name (max 60 chars) capturing the document's main topic\n\
742. A one-sentence description (10-20 words) summarizing the key insight\n\
753. Domain-specific entities (concepts, tools, people, decisions, projects, files)\n\
764. Typed relationships between entities with strength scores\n\n\
77Rules:\n\
78- Entity names: lowercase kebab-case, 2+ chars, domain-specific only\n\
79- NEVER extract generic terms, stop words, numbers, UUIDs, or single characters\n\
80- Relationship types MUST be one of: applies-to, uses, depends-on, causes, fixes, contradicts, supports, follows, related, replaces, tracked-in\n\
81- NEVER use 'mentions' as relationship type\n\
82- Strength: 0.9 for hard dependencies, 0.7 for design relationships, 0.5 for contextual links, 0.3 for weak references\n\
83- Prefer fewer high-quality entities over many low-quality ones\n\
84- Description must answer: What is this about and WHY does it matter?";
85
86#[derive(Debug, Clone, Deserialize, Serialize)]
88struct CodexUsage {
89 input_tokens: u64,
90 #[serde(default)]
91 cached_input_tokens: u64,
92 output_tokens: u64,
93 #[serde(default)]
94 reasoning_output_tokens: u64,
95}
96
97#[derive(Debug, Serialize)]
98struct PhaseEvent<'a> {
99 phase: &'a str,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 codex_path: Option<&'a str>,
102 #[serde(skip_serializing_if = "Option::is_none")]
103 version: Option<&'a str>,
104 #[serde(skip_serializing_if = "Option::is_none")]
105 dir: Option<&'a str>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 files_total: Option<usize>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 files_new: Option<usize>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 files_existing: Option<usize>,
112}
113
114#[derive(Debug, Serialize)]
115struct FileEvent<'a> {
116 file: &'a str,
117 name: &'a str,
118 status: &'a str,
119 #[serde(skip_serializing_if = "Option::is_none")]
120 memory_id: Option<i64>,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 entities: Option<usize>,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 rels: Option<usize>,
125 #[serde(skip_serializing_if = "Option::is_none")]
127 cost_usd: Option<f64>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 input_tokens: Option<u64>,
130 #[serde(skip_serializing_if = "Option::is_none")]
131 output_tokens: Option<u64>,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 elapsed_ms: Option<u64>,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 error: Option<&'a str>,
136 index: usize,
137 total: usize,
138}
139
140#[derive(Debug, Serialize)]
141struct Summary {
142 summary: bool,
143 files_total: usize,
144 completed: usize,
145 failed: usize,
146 skipped: usize,
147 entities_total: usize,
148 rels_total: usize,
149 input_tokens_total: u64,
150 output_tokens_total: u64,
151 elapsed_ms: u64,
152}
153
154pub fn find_codex_binary(explicit: Option<&Path>) -> Result<PathBuf, AppError> {
161 if let Some(p) = explicit {
162 if p.exists() {
163 return Ok(p.to_path_buf());
164 }
165 return Err(AppError::Validation(format!(
166 "Codex CLI binary not found at explicit path: {}",
167 p.display()
168 )));
169 }
170
171 if let Ok(env_path) = std::env::var("SQLITE_GRAPHRAG_CODEX_BINARY") {
172 let p = PathBuf::from(&env_path);
173 if p.exists() {
174 return Ok(p);
175 }
176 }
177
178 let name = if cfg!(windows) { "codex.exe" } else { "codex" };
179 if let Some(path_var) = std::env::var_os("PATH") {
180 for dir in std::env::split_paths(&path_var) {
181 let candidate = dir.join(name);
182 if candidate.exists() {
183 return Ok(candidate);
184 }
185 }
186 }
187
188 Err(AppError::Validation(
189 "Codex CLI binary not found in PATH. Install it from https://github.com/openai/codex or specify --codex-binary".to_string(),
190 ))
191}
192
193fn validate_codex_version(binary: &Path) -> Result<String, AppError> {
200 let output = Command::new(binary)
201 .arg("--version")
202 .stdin(Stdio::null())
203 .stdout(Stdio::piped())
204 .stderr(Stdio::piped())
205 .output()
206 .map_err(AppError::Io)?;
207
208 let raw = String::from_utf8(output.stdout)
209 .map_err(|_| AppError::Validation("codex --version output is not UTF-8".to_string()))?;
210
211 let version_str = raw.trim().to_string();
212
213 let numeric = version_str.split_whitespace().last().unwrap_or("").trim();
215
216 fn parse_semver(s: &str) -> Option<(u64, u64, u64)> {
217 let parts: Vec<&str> = s.splitn(3, '.').collect();
218 if parts.len() < 2 {
219 return None;
220 }
221 let major = parts[0].parse::<u64>().ok()?;
222 let minor = parts[1].parse::<u64>().ok()?;
223 let patch = parts
224 .get(2)
225 .and_then(|p| p.parse::<u64>().ok())
226 .unwrap_or(0);
227 Some((major, minor, patch))
228 }
229
230 if let (Some(actual), Some(min)) = (parse_semver(numeric), parse_semver(MIN_CODEX_VERSION)) {
231 if actual < min {
232 return Err(AppError::Validation(format!(
233 "Codex CLI version {numeric} is below minimum required {MIN_CODEX_VERSION}"
234 )));
235 }
236 }
237
238 Ok(version_str)
239}
240
241fn write_schema_tempfile() -> Result<tempfile::NamedTempFile, AppError> {
247 let mut f = tempfile::NamedTempFile::new().map_err(AppError::Io)?;
248 std::io::Write::write_all(&mut f, EXTRACTION_SCHEMA_CODEX.as_bytes()).map_err(AppError::Io)?;
249 std::io::Write::flush(&mut f).map_err(AppError::Io)?;
250 Ok(f)
251}
252
253fn extract_with_codex(
264 binary: &Path,
265 file_content: &[u8],
266 model: Option<&str>,
267 timeout_secs: u64,
268 schema_file: &Path,
269) -> Result<(ExtractionResult, Option<CodexUsage>), AppError> {
270 use wait_timeout::ChildExt;
271
272 let mut cmd = Command::new(binary);
273
274 cmd.env_clear();
275 for var in &[
276 "PATH",
277 "HOME",
278 "USER",
279 "SHELL",
280 "TERM",
281 "LANG",
282 "XDG_CONFIG_HOME",
283 "XDG_DATA_HOME",
284 "XDG_RUNTIME_DIR",
285 "XDG_CACHE_HOME",
286 "OPENAI_API_KEY",
287 "CODEX_ACCESS_TOKEN",
288 "CODEX_HOME",
289 "TMPDIR",
290 "TMP",
291 "TEMP",
292 "DYLD_FALLBACK_LIBRARY_PATH",
293 ] {
294 if let Ok(val) = std::env::var(var) {
295 cmd.env(var, val);
296 }
297 }
298
299 #[cfg(windows)]
300 for var in &[
301 "LOCALAPPDATA",
302 "APPDATA",
303 "USERPROFILE",
304 "SystemRoot",
305 "COMSPEC",
306 "PATHEXT",
307 ] {
308 if let Ok(val) = std::env::var(var) {
309 cmd.env(var, val);
310 }
311 }
312
313 cmd.arg("exec")
314 .arg("--json")
315 .arg("--output-schema")
316 .arg(schema_file)
317 .arg("--ephemeral")
318 .arg("--skip-git-repo-check")
319 .arg("--sandbox")
320 .arg("read-only")
321 .arg("--ignore-user-config")
322 .arg("--ignore-rules");
323
324 if let Some(m) = model {
325 cmd.arg("-m").arg(m);
326 }
327
328 cmd.arg("-");
330
331 cmd.stdin(Stdio::piped())
332 .stdout(Stdio::piped())
333 .stderr(Stdio::piped());
334
335 let mut child = cmd.spawn().map_err(|e| {
336 AppError::Io(std::io::Error::new(
337 e.kind(),
338 format!("failed to spawn codex: {e}"),
339 ))
340 })?;
341
342 let file_utf8 = String::from_utf8_lossy(file_content);
344 let stdin_payload = format!("{EXTRACTION_PROMPT}\n\n---\n\nDocument content:\n\n{file_utf8}");
345 let stdin_bytes = stdin_payload.into_bytes();
346
347 let mut child_stdin = child
348 .stdin
349 .take()
350 .ok_or_else(|| AppError::Validation("failed to open codex stdin".into()))?;
351 let stdin_thread = std::thread::spawn(move || -> Result<(), std::io::Error> {
352 child_stdin.write_all(&stdin_bytes)?;
353 Ok(())
354 });
355
356 let timeout = std::time::Duration::from_secs(timeout_secs);
357 let status = child.wait_timeout(timeout).map_err(AppError::Io)?;
358
359 match status {
360 Some(exit_status) => {
361 stdin_thread
362 .join()
363 .map_err(|_| AppError::Validation("stdin thread panicked".into()))?
364 .map_err(AppError::Io)?;
365
366 let mut stdout_buf = Vec::new();
367 let mut stderr_buf = Vec::new();
368 if let Some(mut out) = child.stdout.take() {
369 std::io::Read::read_to_end(&mut out, &mut stdout_buf).map_err(AppError::Io)?;
370 }
371 if let Some(mut err) = child.stderr.take() {
372 std::io::Read::read_to_end(&mut err, &mut stderr_buf).map_err(AppError::Io)?;
373 }
374
375 if !exit_status.success() {
376 let stderr_str = String::from_utf8_lossy(&stderr_buf);
377 let stdout_str = String::from_utf8_lossy(&stdout_buf);
378 if let Ok((result, usage)) = parse_codex_output(&stdout_str) {
380 return Ok((result, usage));
381 }
382 if stderr_str.contains("401")
383 || stderr_str.contains("Unauthorized")
384 || stderr_str.contains("auth")
385 {
386 tracing::warn!(
387 target: "ingest",
388 "Codex CLI authentication expired. Re-authenticate with: codex auth login"
389 );
390 }
391 return Err(AppError::Validation(format!(
392 "codex exec exited with code {:?}: {}",
393 exit_status.code(),
394 stderr_str.trim()
395 )));
396 }
397
398 let stdout = String::from_utf8(stdout_buf)
399 .map_err(|_| AppError::Validation("codex exec stdout is not valid UTF-8".into()))?;
400 parse_codex_output(&stdout)
401 }
402 None => {
403 tracing::warn!(target: "ingest", timeout_secs, "codex exec timed out, killing process");
404 let _ = child.kill();
405 let _ = child.wait();
406 let _ = stdin_thread.join();
407 Err(AppError::Validation(format!(
408 "codex exec timed out after {timeout_secs} seconds"
409 )))
410 }
411 }
412}
413
414fn parse_codex_output(stdout: &str) -> Result<(ExtractionResult, Option<CodexUsage>), AppError> {
429 let mut last_agent_text: Option<String> = None;
430 let mut usage: Option<CodexUsage> = None;
431 let mut rate_limited = false;
432 let mut schema_error = false;
433 let mut turn_failed = false;
434 let mut failed_message = String::new();
435
436 for line in stdout.lines() {
437 let line = line.trim();
438 if line.is_empty() {
439 continue;
440 }
441
442 let event: serde_json::Value = match serde_json::from_str(line) {
443 Ok(v) => v,
444 Err(_) => {
445 tracing::warn!(target: "ingest", line, "codex output: skipping malformed JSONL line");
446 continue;
447 }
448 };
449
450 let event_type = match event.get("type").and_then(|t| t.as_str()) {
451 Some(t) => t,
452 None => continue,
453 };
454
455 match event_type {
456 "item.completed" => {
457 if let Some(item) = event.get("item") {
459 if item.get("type").and_then(|t| t.as_str()) == Some("agent_message") {
460 if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
461 last_agent_text = Some(text.to_string());
462 }
463 }
464 }
465 }
466 "turn.completed" => {
467 if let Some(u) = event.get("usage") {
468 if let Ok(parsed) = serde_json::from_value::<CodexUsage>(u.clone()) {
469 usage = Some(parsed);
470 }
471 }
472 }
473 "turn.failed" => {
474 turn_failed = true;
475 if let Some(err) = event.get("error") {
476 let msg = err
477 .get("message")
478 .and_then(|m| m.as_str())
479 .unwrap_or("unknown error");
480 failed_message = msg.to_string();
481 if msg.contains("rate_limit")
482 || msg.contains("429")
483 || msg.contains("Too Many Requests")
484 {
485 rate_limited = true;
486 }
487 }
488 }
489 "error" => {
490 if let Some(msg) = event.get("message").and_then(|m| m.as_str()) {
491 if msg.contains("invalid_json_schema") || msg.contains("schema") {
492 schema_error = true;
493 }
494 tracing::warn!(target: "ingest", error_msg = msg, "codex error event received");
495 }
496 }
497 _ => {
498 }
500 }
501 }
502
503 if rate_limited {
504 return Err(AppError::Validation(format!(
505 "RATE_LIMITED: {failed_message}"
506 )));
507 }
508
509 if schema_error {
510 return Err(AppError::Validation(
511 "codex rejected the output schema (invalid_json_schema)".to_string(),
512 ));
513 }
514
515 if turn_failed {
516 return Err(AppError::Validation(format!(
517 "codex turn failed: {failed_message}"
518 )));
519 }
520
521 let text = last_agent_text.ok_or_else(|| {
522 AppError::Validation("codex output contained no agent_message item".to_string())
523 })?;
524
525 let extraction: ExtractionResult = serde_json::from_str(&text).map_err(|e| {
526 AppError::Validation(format!(
527 "failed to parse codex agent_message as ExtractionResult: {e}. text={text}"
528 ))
529 })?;
530
531 Ok((extraction, usage))
532}
533
534fn emit_json<T: Serialize>(value: &T) {
535 if let Ok(json) = serde_json::to_string(value) {
536 let stdout = std::io::stdout();
537 let mut lock = stdout.lock();
538 let _ = writeln!(lock, "{json}");
539 let _ = lock.flush();
540 }
541}
542
543fn collect_matching_files(
545 dir: &Path,
546 pattern: &str,
547 recursive: bool,
548 max_files: usize,
549) -> Result<Vec<PathBuf>, AppError> {
550 let mut files = Vec::new();
551 super::ingest::collect_files(dir, pattern, recursive, &mut files)?;
552 files.sort();
553
554 if files.len() > max_files {
555 return Err(AppError::Validation(format!(
556 "found {} files, exceeds --max-files cap of {}",
557 files.len(),
558 max_files
559 )));
560 }
561
562 Ok(files)
563}
564
565fn open_queue_db(path: &str) -> Result<Connection, AppError> {
567 let conn = Connection::open(path)?;
568
569 conn.execute_batch(
570 "PRAGMA journal_mode=WAL;
571 CREATE TABLE IF NOT EXISTS queue (
572 id INTEGER PRIMARY KEY AUTOINCREMENT,
573 file_path TEXT NOT NULL UNIQUE,
574 name TEXT,
575 status TEXT NOT NULL DEFAULT 'pending',
576 memory_id INTEGER,
577 entities INTEGER DEFAULT 0,
578 rels INTEGER DEFAULT 0,
579 error TEXT,
580 input_tokens INTEGER DEFAULT 0,
581 output_tokens INTEGER DEFAULT 0,
582 attempt INTEGER DEFAULT 0,
583 elapsed_ms INTEGER,
584 created_at TEXT DEFAULT (datetime('now')),
585 done_at TEXT
586 );
587 CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);",
588 )?;
589
590 Ok(conn)
591}
592
593pub fn run_codex_ingest(args: &IngestArgs) -> Result<(), AppError> {
599 let started = Instant::now();
600
601 if !args.dir.exists() {
602 return Err(AppError::Validation(format!(
603 "directory not found: {}",
604 args.dir.display()
605 )));
606 }
607
608 let codex_binary = find_codex_binary(args.codex_binary.as_deref())?;
610 let version = validate_codex_version(&codex_binary)?;
611 tracing::info!(
612 target: "ingest",
613 binary = %codex_binary.display(),
614 version = %version,
615 "Codex CLI binary validated"
616 );
617
618 emit_json(&PhaseEvent {
619 phase: "validate",
620 codex_path: codex_binary.to_str(),
621 version: Some(&version),
622 dir: None,
623 files_total: None,
624 files_new: None,
625 files_existing: None,
626 });
627
628 let files = collect_matching_files(&args.dir, &args.pattern, args.recursive, args.max_files)?;
630
631 let queue_conn = open_queue_db(&args.queue_db)?;
632
633 if args.resume {
634 let reset = queue_conn
635 .execute(
636 "UPDATE queue SET status='pending' WHERE status='processing'",
637 [],
638 )
639 .map_err(|e| AppError::Validation(format!("queue resume failed: {e}")))?;
640 if reset > 0 {
641 tracing::info!(target: "ingest", count = reset, "reset stuck processing files to pending");
642 }
643 }
644
645 if args.retry_failed {
646 let count = queue_conn
647 .execute(
648 "UPDATE queue SET status='pending', attempt=0 WHERE status='failed'",
649 [],
650 )
651 .map_err(|e| AppError::Validation(format!("queue retry-failed reset failed: {e}")))?;
652 tracing::info!(target: "ingest", count, "retrying failed files");
653 }
654
655 if !args.resume && !args.retry_failed {
656 queue_conn
657 .execute("DELETE FROM queue", [])
658 .map_err(|e| AppError::Validation(format!("queue clear failed: {e}")))?;
659 }
660
661 let mut new_count = 0usize;
662 let mut existing_count = 0usize;
663
664 if !args.retry_failed {
665 for file in &files {
666 let file_str = file.to_string_lossy().to_string();
667 let inserted = queue_conn
668 .execute(
669 "INSERT OR IGNORE INTO queue (file_path, status) VALUES (?1, 'pending')",
670 rusqlite::params![file_str],
671 )
672 .map_err(|e| AppError::Validation(format!("queue insert failed: {e}")))?;
673 if inserted > 0 {
674 new_count += 1;
675 } else {
676 existing_count += 1;
677 }
678 }
679 }
680
681 emit_json(&PhaseEvent {
682 phase: "scan",
683 codex_path: None,
684 version: None,
685 dir: args.dir.to_str(),
686 files_total: Some(files.len()),
687 files_new: Some(new_count),
688 files_existing: Some(existing_count),
689 });
690
691 if args.dry_run {
692 for (idx, file) in files.iter().enumerate() {
693 let (name, _truncated, _orig) =
694 super::ingest::derive_kebab_name(file, args.max_name_length);
695 emit_json(&FileEvent {
696 file: &file.to_string_lossy(),
697 name: &name,
698 status: "preview",
699 memory_id: None,
700 entities: None,
701 rels: None,
702 cost_usd: None,
703 input_tokens: None,
704 output_tokens: None,
705 elapsed_ms: None,
706 error: None,
707 index: idx,
708 total: files.len(),
709 });
710 }
711 emit_json(&Summary {
712 summary: true,
713 files_total: files.len(),
714 completed: 0,
715 failed: 0,
716 skipped: 0,
717 entities_total: 0,
718 rels_total: 0,
719 input_tokens_total: 0,
720 output_tokens_total: 0,
721 elapsed_ms: started.elapsed().as_millis() as u64,
722 });
723 if !args.keep_queue {
724 let _ = std::fs::remove_file(&args.queue_db);
725 }
726 return Ok(());
727 }
728
729 let paths = AppPaths::resolve(args.db.as_deref())?;
731 ensure_db_ready(&paths)?;
732 let conn = open_rw(&paths.db)?;
733 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
734 let memory_type_str = args.r#type.as_str().to_string();
735
736 let schema_tempfile = write_schema_tempfile()?;
738 let schema_path = schema_tempfile.path().to_path_buf();
739
740 let mut completed = 0usize;
741 let mut failed = 0usize;
742 let skipped_initial: usize = queue_conn
743 .query_row("SELECT COUNT(*) FROM queue WHERE status='done'", [], |r| {
744 r.get::<_, usize>(0)
745 })
746 .unwrap_or(0);
747 let skipped = skipped_initial;
748 let mut entities_total = 0usize;
749 let mut rels_total = 0usize;
750 let mut input_tokens_total = 0u64;
751 let mut output_tokens_total = 0u64;
752 let total = files.len();
753
754 let mut backoff_secs = args.rate_limit_wait;
755
756 loop {
757 let pending: Option<(i64, String)> = queue_conn
758 .query_row(
759 "UPDATE queue SET status='processing', attempt=attempt+1 \
760 WHERE id = (SELECT id FROM queue WHERE status='pending' ORDER BY id LIMIT 1) \
761 RETURNING id, file_path",
762 [],
763 |row| Ok((row.get(0)?, row.get(1)?)),
764 )
765 .ok();
766
767 let (queue_id, file_path) = match pending {
768 Some(p) => p,
769 None => break,
770 };
771
772 let file_started = Instant::now();
773
774 const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
776 if let Ok(meta) = std::fs::metadata(&file_path) {
777 if meta.len() > MAX_FILE_SIZE {
778 let err_msg = format!("file exceeds 10MB stdin limit ({} bytes)", meta.len());
779 let _ = queue_conn.execute(
780 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
781 rusqlite::params![err_msg, queue_id],
782 );
783 let current_index = completed + failed + skipped;
784 failed += 1;
785 emit_json(&FileEvent {
786 file: &file_path,
787 name: "",
788 status: "failed",
789 memory_id: None,
790 entities: None,
791 rels: None,
792 cost_usd: None,
793 input_tokens: None,
794 output_tokens: None,
795 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
796 error: Some(&err_msg),
797 index: current_index,
798 total,
799 });
800 if args.fail_fast {
801 break;
802 }
803 continue;
804 }
805 }
806
807 let file_content = match std::fs::read(&file_path) {
808 Ok(c) => c,
809 Err(e) => {
810 let err_msg = format!("IO error: {e}");
811 let _ = queue_conn.execute(
812 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
813 rusqlite::params![err_msg, queue_id],
814 );
815 let current_index = completed + failed + skipped;
816 failed += 1;
817 emit_json(&FileEvent {
818 file: &file_path,
819 name: "",
820 status: "failed",
821 memory_id: None,
822 entities: None,
823 rels: None,
824 cost_usd: None,
825 input_tokens: None,
826 output_tokens: None,
827 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
828 error: Some(&err_msg),
829 index: current_index,
830 total,
831 });
832 if args.fail_fast {
833 break;
834 }
835 continue;
836 }
837 };
838
839 let max_extract_attempts: u32 = 2;
841 let mut extraction_result: Option<(ExtractionResult, Option<CodexUsage>)> = None;
842 let mut last_extract_err: Option<String> = None;
843
844 for attempt in 1..=max_extract_attempts {
845 match extract_with_codex(
846 &codex_binary,
847 &file_content,
848 args.codex_model.as_deref(),
849 args.codex_timeout,
850 &schema_path,
851 ) {
852 Ok(result) => {
853 extraction_result = Some(result);
854 break;
855 }
856 Err(ref e) if format!("{e}").contains("RATE_LIMITED") => {
857 last_extract_err = Some(format!("{e}"));
858 break;
859 }
860 Err(e) => {
861 let msg = format!("{e}");
862 if attempt < max_extract_attempts {
863 tracing::warn!(
864 target: "ingest",
865 attempt,
866 error = %msg,
867 "codex extraction failed, retrying"
868 );
869 std::thread::sleep(std::time::Duration::from_secs(2));
870 }
871 last_extract_err = Some(msg);
872 }
873 }
874 }
875
876 if let Some((extraction, usage)) = extraction_result {
877 backoff_secs = args.rate_limit_wait;
878
879 let in_tok = usage.as_ref().map(|u| u.input_tokens).unwrap_or(0);
880 let out_tok = usage.as_ref().map(|u| u.output_tokens).unwrap_or(0);
881
882 let name = &extraction.name;
883 let ent_count = extraction.entities.len();
884 let rel_count = extraction.relationships.len();
885
886 let new_entities: Vec<NewEntity> = extraction
887 .entities
888 .iter()
889 .filter_map(|e| match e.entity_type.parse::<EntityType>() {
890 Ok(et) => Some(NewEntity {
891 name: e.name.clone(),
892 entity_type: et,
893 description: None,
894 }),
895 Err(_) => {
896 tracing::warn!(
897 target: "ingest",
898 entity = %e.name,
899 entity_type = %e.entity_type,
900 "entity type not recognized, skipping"
901 );
902 None
903 }
904 })
905 .collect();
906
907 let new_relationships: Vec<NewRelationship> = extraction
908 .relationships
909 .iter()
910 .map(|r| NewRelationship {
911 source: r.source.clone(),
912 target: r.target.clone(),
913 relation: crate::parsers::normalize_relation(&r.relation),
914 strength: r.strength,
915 description: None,
916 })
917 .collect();
918
919 let body_str = String::from_utf8_lossy(&file_content);
920 let body_hash = blake3::hash(body_str.as_bytes()).to_hex().to_string();
921 let new_memory = NewMemory {
922 name: name.clone(),
923 namespace: namespace.clone(),
924 memory_type: memory_type_str.clone(),
925 description: extraction.description.clone(),
926 body: body_str.to_string(),
927 body_hash,
928 session_id: None,
929 source: "agent".to_string(),
930 metadata: serde_json::Value::Object(serde_json::Map::new()),
931 };
932
933 let memory_id = match memories::find_by_name_any_state(&conn, &namespace, name)? {
935 Some((existing_id, is_deleted)) => {
936 if is_deleted {
937 memories::clear_deleted_at(&conn, existing_id)?;
938 }
939 let (old_name, old_desc, old_body): (String, String, String) = conn.query_row(
940 "SELECT name, COALESCE(description,''), COALESCE(body,'') FROM memories WHERE id=?1",
941 rusqlite::params![existing_id],
942 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
943 )?;
944 memories::update(&conn, existing_id, &new_memory, None)?;
945 memories::sync_fts_after_update(
946 &conn,
947 existing_id,
948 &old_name,
949 &old_desc,
950 &old_body,
951 &new_memory.name,
952 &new_memory.description,
953 &new_memory.body,
954 )?;
955 tracing::info!(target: "ingest", name, memory_id = existing_id, "updated existing memory (force-merge)");
956 existing_id
957 }
958 None => match memories::insert(&conn, &new_memory) {
959 Ok(id) => id,
960 Err(e) => {
961 let err_msg = format!("{e}");
962 let _ = queue_conn.execute(
963 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
964 rusqlite::params![err_msg, queue_id],
965 );
966 let current_index = completed + failed + skipped;
967 failed += 1;
968 emit_json(&FileEvent {
969 file: &file_path,
970 name,
971 status: "failed",
972 memory_id: None,
973 entities: None,
974 rels: None,
975 cost_usd: None,
976 input_tokens: Some(in_tok),
977 output_tokens: Some(out_tok),
978 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
979 error: Some(&err_msg),
980 index: current_index,
981 total,
982 });
983 input_tokens_total += in_tok;
984 output_tokens_total += out_tok;
985 if args.fail_fast {
986 break;
987 }
988 continue;
989 }
990 },
991 };
992
993 for ent in &new_entities {
994 if let Ok(eid) = entities::upsert_entity(&conn, &namespace, ent) {
995 let _ = entities::link_memory_entity(&conn, memory_id, eid);
996 }
997 }
998 for rel in &new_relationships {
999 crate::parsers::warn_if_non_canonical(&rel.relation);
1000 let src_id = entities::find_entity_id(&conn, &namespace, &rel.source);
1001 let tgt_id = entities::find_entity_id(&conn, &namespace, &rel.target);
1002 if let (Ok(Some(sid)), Ok(Some(tid))) = (src_id, tgt_id) {
1003 let _ = conn.execute(
1004 "INSERT OR IGNORE INTO relationships (namespace, source_id, target_id, relation, weight) VALUES (?1, ?2, ?3, ?4, ?5)",
1005 rusqlite::params![namespace, sid, tid, rel.relation, rel.strength],
1006 );
1007 }
1008 }
1009
1010 let _ = queue_conn.execute(
1011 "UPDATE queue SET status='done', name=?1, memory_id=?2, entities=?3, rels=?4, \
1012 input_tokens=?5, output_tokens=?6, elapsed_ms=?7, done_at=datetime('now') WHERE id=?8",
1013 rusqlite::params![
1014 name,
1015 memory_id,
1016 ent_count,
1017 rel_count,
1018 in_tok,
1019 out_tok,
1020 file_started.elapsed().as_millis() as i64,
1021 queue_id
1022 ],
1023 );
1024
1025 let current_index = completed + failed + skipped;
1026 completed += 1;
1027 entities_total += ent_count;
1028 rels_total += rel_count;
1029 input_tokens_total += in_tok;
1030 output_tokens_total += out_tok;
1031
1032 emit_json(&FileEvent {
1033 file: &file_path,
1034 name,
1035 status: "done",
1036 memory_id: Some(memory_id),
1037 entities: Some(ent_count),
1038 rels: Some(rel_count),
1039 cost_usd: None,
1040 input_tokens: Some(in_tok),
1041 output_tokens: Some(out_tok),
1042 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1043 error: None,
1044 index: current_index,
1045 total,
1046 });
1047 } else if let Some(ref err_str) = last_extract_err {
1048 if err_str.contains("RATE_LIMITED") {
1049 tracing::warn!(
1050 target: "ingest",
1051 wait_seconds = backoff_secs,
1052 "rate limited by Codex API, waiting before retry"
1053 );
1054 let _ = queue_conn.execute(
1055 "UPDATE queue SET status='pending' WHERE id=?1",
1056 rusqlite::params![queue_id],
1057 );
1058 std::thread::sleep(std::time::Duration::from_secs(backoff_secs));
1059 backoff_secs = (backoff_secs * 2).min(900);
1060 continue;
1061 } else {
1062 let _ = queue_conn.execute(
1063 "UPDATE queue SET status='failed', error=?1, done_at=datetime('now') WHERE id=?2",
1064 rusqlite::params![err_str, queue_id],
1065 );
1066 let current_index = completed + failed + skipped;
1067 failed += 1;
1068 emit_json(&FileEvent {
1069 file: &file_path,
1070 name: "",
1071 status: "failed",
1072 memory_id: None,
1073 entities: None,
1074 rels: None,
1075 cost_usd: None,
1076 input_tokens: None,
1077 output_tokens: None,
1078 elapsed_ms: Some(file_started.elapsed().as_millis() as u64),
1079 error: Some(err_str),
1080 index: current_index,
1081 total,
1082 });
1083 if args.fail_fast {
1084 break;
1085 }
1086 }
1087 }
1088 }
1089
1090 let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);");
1092
1093 emit_json(&Summary {
1095 summary: true,
1096 files_total: total,
1097 completed,
1098 failed,
1099 skipped,
1100 entities_total,
1101 rels_total,
1102 input_tokens_total,
1103 output_tokens_total,
1104 elapsed_ms: started.elapsed().as_millis() as u64,
1105 });
1106
1107 if !args.keep_queue && failed == 0 {
1108 let _ = std::fs::remove_file(&args.queue_db);
1109 }
1110
1111 Ok(())
1112}
1113
1114#[cfg(test)]
1115mod tests {
1116 use super::*;
1117
1118 fn make_agent_message_event(text: &str) -> String {
1119 format!(
1120 r#"{{"type":"item.completed","item":{{"id":"item_0","type":"agent_message","text":{}}}}}"#,
1121 serde_json::to_string(text).unwrap()
1122 )
1123 }
1124
1125 fn make_usage_event(input: u64, output: u64) -> String {
1126 format!(
1127 r#"{{"type":"turn.completed","usage":{{"input_tokens":{input},"output_tokens":{output}}}}}"#
1128 )
1129 }
1130
1131 fn valid_extraction_json() -> String {
1132 r#"{"name":"test-module","description":"A test module for unit testing purposes","entities":[{"name":"test-entity","entity_type":"concept"}],"relationships":[{"source":"test-entity","target":"test-module","relation":"applies-to","strength":0.8}]}"#.to_string()
1133 }
1134
1135 #[test]
1136 fn test_parse_codex_output_valid() {
1137 let jsonl = format!(
1138 "{}\n{}\n{}",
1139 r#"{"type":"thread.started","thread_id":"t1"}"#,
1140 make_agent_message_event(&valid_extraction_json()),
1141 make_usage_event(100, 50),
1142 );
1143
1144 let (result, usage) = parse_codex_output(&jsonl).expect("parse must succeed");
1145 assert_eq!(result.name, "test-module");
1146 assert_eq!(result.entities.len(), 1);
1147 assert_eq!(result.relationships.len(), 1);
1148 let u = usage.expect("usage must be present");
1149 assert_eq!(u.input_tokens, 100);
1150 assert_eq!(u.output_tokens, 50);
1151 }
1152
1153 #[test]
1154 fn test_parse_codex_output_turn_failed() {
1155 let jsonl = format!(
1156 "{}\n{}",
1157 r#"{"type":"thread.started","thread_id":"t1"}"#,
1158 r#"{"type":"turn.failed","error":{"message":"model error occurred"}}"#,
1159 );
1160
1161 let err = parse_codex_output(&jsonl).unwrap_err();
1162 let msg = format!("{err}");
1163 assert!(
1164 msg.contains("turn failed"),
1165 "expected 'turn failed' in: {msg}"
1166 );
1167 assert!(msg.contains("model error occurred"));
1168 }
1169
1170 #[test]
1171 fn test_parse_codex_output_rate_limit() {
1172 let jsonl = r#"{"type":"turn.failed","error":{"message":"rate_limit exceeded, 429 Too Many Requests"}}"#;
1173
1174 let err = parse_codex_output(jsonl).unwrap_err();
1175 let msg = format!("{err}");
1176 assert!(
1177 msg.contains("RATE_LIMITED"),
1178 "expected 'RATE_LIMITED' in: {msg}"
1179 );
1180 }
1181
1182 #[test]
1183 fn test_parse_codex_output_schema_error() {
1184 let jsonl = r#"{"type":"error","message":"invalid_json_schema: additional properties not allowed"}"#;
1185
1186 let err = parse_codex_output(jsonl).unwrap_err();
1187 let msg = format!("{err}");
1188 assert!(
1189 msg.contains("invalid_json_schema") || msg.contains("schema"),
1190 "expected schema error in: {msg}"
1191 );
1192 }
1193
1194 #[test]
1195 fn test_extraction_schema_codex_valid_json() {
1196 let _: serde_json::Value =
1197 serde_json::from_str(EXTRACTION_SCHEMA_CODEX).expect("schema must be valid JSON");
1198 }
1199
1200 #[test]
1201 fn test_extraction_schema_codex_has_additional_properties_false() {
1202 let schema: serde_json::Value =
1203 serde_json::from_str(EXTRACTION_SCHEMA_CODEX).expect("schema must be valid JSON");
1204
1205 assert_eq!(
1207 schema["additionalProperties"].as_bool(),
1208 Some(false),
1209 "root must have additionalProperties: false"
1210 );
1211
1212 assert_eq!(
1214 schema["properties"]["entities"]["items"]["additionalProperties"].as_bool(),
1215 Some(false),
1216 "entity items must have additionalProperties: false"
1217 );
1218
1219 assert_eq!(
1221 schema["properties"]["relationships"]["items"]["additionalProperties"].as_bool(),
1222 Some(false),
1223 "relationship items must have additionalProperties: false"
1224 );
1225 }
1226
1227 #[test]
1228 fn test_parse_codex_output_last_agent_message_wins() {
1229 let first_text = r#"{"name":"first-result","description":"First result should be ignored","entities":[],"relationships":[]}"#;
1231 let second_text = r#"{"name":"final-result","description":"Final result wins over earlier ones","entities":[{"name":"final-entity","entity_type":"concept"}],"relationships":[]}"#;
1232
1233 let jsonl = format!(
1234 "{}\n{}\n{}\n{}",
1235 r#"{"type":"thread.started","thread_id":"t1"}"#,
1236 make_agent_message_event(first_text),
1237 make_agent_message_event(second_text),
1238 make_usage_event(200, 80),
1239 );
1240
1241 let (result, _) = parse_codex_output(&jsonl).expect("parse must succeed");
1242 assert_eq!(result.name, "final-result", "last agent_message should win");
1243 assert_eq!(result.entities.len(), 1);
1244 }
1245
1246 #[test]
1247 fn test_parse_codex_output_skips_malformed_lines() {
1248 let jsonl = format!(
1249 "not json at all\n{}\n{{broken\n{}",
1250 make_agent_message_event(&valid_extraction_json()),
1251 make_usage_event(10, 5),
1252 );
1253
1254 let (result, _) = parse_codex_output(&jsonl).expect("malformed lines must be skipped");
1256 assert_eq!(result.name, "test-module");
1257 }
1258}