1#![cfg_attr(docsrs, feature(doc_cfg))]
45
46use std::path::{Path, PathBuf};
47use std::sync::{Arc, Mutex};
48use std::time::Duration;
49
50use async_trait::async_trait;
51use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
52use tokio::process::{Child, Command};
53use tokio::sync::Mutex as AsyncMutex;
54use uuid::Uuid;
55
56use nucel_agent_core::{
57 AgentCapabilities, AgentCost, AgentError, AgentExecutor, AgentResponse, AgentSession,
58 AvailabilityStatus, EventStream, ExecutorType, MessageEvent, PermissionMode, Result,
59 SessionImpl, SpawnConfig,
60};
61
62const DEFAULT_TIMEOUT_SECS: u64 = 600;
64
65const STDERR_BUFFER_BYTES: usize = 4096;
67
68pub struct CodexExecutor {
70 api_key: Option<String>,
71}
72
73impl CodexExecutor {
74 pub fn new() -> Self {
75 Self { api_key: None }
76 }
77
78 pub fn with_api_key(api_key: impl Into<String>) -> Self {
79 Self {
80 api_key: Some(api_key.into()),
81 }
82 }
83
84 fn check_cli_available() -> bool {
85 std::process::Command::new("which")
86 .arg("codex")
87 .stdout(std::process::Stdio::null())
88 .stderr(std::process::Stdio::null())
89 .status()
90 .map(|s| s.success())
91 .unwrap_or(false)
92 }
93}
94
95impl Default for CodexExecutor {
96 fn default() -> Self {
97 Self::new()
98 }
99}
100
101pub(crate) fn parse_codex_line(line: &str) -> Result<Option<CodexEvent>> {
104 let v: serde_json::Value =
105 serde_json::from_str(line).map_err(|e| AgentError::Provider {
106 provider: "codex".into(),
107 message: format!("JSON parse error: {e}"),
108 })?;
109
110 let event_type = v.get("type").and_then(|t| t.as_str()).unwrap_or("");
111
112 match event_type {
113 "thread.started" => {
114 let thread_id = v
115 .get("thread_id")
116 .and_then(|t| t.as_str())
117 .unwrap_or("")
118 .to_string();
119 Ok(Some(CodexEvent::ThreadStarted { thread_id }))
120 }
121 "turn.started" => Ok(Some(CodexEvent::TurnStarted)),
122 "item.completed" => {
123 let item = &v["item"];
124 let item_type = item.get("type").and_then(|t| t.as_str()).unwrap_or("");
125 match item_type {
126 "agent_message" => {
127 let text = item
128 .get("text")
129 .and_then(|t| t.as_str())
130 .unwrap_or("")
131 .to_string();
132 Ok(Some(CodexEvent::Message(text)))
133 }
134 "reasoning" | "command_execution" | "file_change" | "mcp_tool_call" => {
135 tracing::debug!(item_type = %item_type, "codex item completed");
136 Ok(Some(CodexEvent::Other))
137 }
138 _ => Ok(Some(CodexEvent::Other)),
139 }
140 }
141 "turn.completed" => {
142 let usage = v
145 .get("usage")
146 .or_else(|| v.get("token_usage"))
147 .cloned()
148 .unwrap_or(serde_json::Value::Null);
149 let input_tokens = usage
150 .get("input_tokens")
151 .and_then(|v| v.as_u64())
152 .unwrap_or(0);
153 let output_tokens = usage
154 .get("output_tokens")
155 .and_then(|v| v.as_u64())
156 .unwrap_or(0);
157 Ok(Some(CodexEvent::TurnCompleted {
158 input_tokens,
159 output_tokens,
160 }))
161 }
162 "turn.failed" => {
163 let error_msg = v
164 .get("error")
165 .and_then(|e| e.get("message"))
166 .and_then(|m| m.as_str())
167 .unwrap_or("unknown error")
168 .to_string();
169 Ok(Some(CodexEvent::Error(error_msg)))
170 }
171 "error" => {
172 let error_msg = v
173 .get("message")
174 .and_then(|m| m.as_str())
175 .unwrap_or("unknown error")
176 .to_string();
177 Ok(Some(CodexEvent::Error(error_msg)))
178 }
179 _ => Ok(Some(CodexEvent::Other)),
180 }
181}
182
183#[derive(Debug)]
184pub(crate) enum CodexEvent {
185 ThreadStarted { thread_id: String },
186 TurnStarted,
187 Message(String),
188 TurnCompleted {
189 input_tokens: u64,
190 output_tokens: u64,
191 },
192 Error(String),
193 Other,
194}
195
196pub(crate) fn permission_to_codex_args(cmd: &mut Command, mode: Option<PermissionMode>) {
198 match mode {
199 Some(PermissionMode::BypassPermissions) => {
200 cmd.arg("--dangerously-bypass-approvals-and-sandbox");
201 }
202 Some(PermissionMode::AcceptEdits) => {
203 cmd.arg("--sandbox").arg("workspace-write");
206 }
207 Some(PermissionMode::RejectAll) => {
208 cmd.arg("--sandbox").arg("read-only");
209 }
210 Some(PermissionMode::DontAsk) => {
211 cmd.arg("--sandbox").arg("read-only");
213 }
214 Some(PermissionMode::Auto) | Some(PermissionMode::Prompt) | None => {
215 cmd.arg("--sandbox").arg("workspace-write");
217 }
218 Some(_) => {
220 cmd.arg("--sandbox").arg("workspace-write");
221 }
222 }
223}
224
225struct CodexRunOutput {
227 content: String,
228 cost: AgentCost,
229 thread_id: String,
230}
231
232async fn run_codex(
234 working_dir: &Path,
235 prompt: &str,
236 config: &SpawnConfig,
237 api_key: Option<&str>,
238 resume_thread_id: Option<&str>,
239) -> Result<CodexRunOutput> {
240 let mut cmd = Command::new("codex");
241 cmd.current_dir(working_dir);
242 cmd.arg("exec");
243
244 if let Some(tid) = resume_thread_id {
246 cmd.arg("resume").arg(tid);
247 }
248
249 cmd.arg("--json"); cmd.arg("--skip-git-repo-check");
251 cmd.arg("--color").arg("never");
253
254 if let Some(model) = &config.model {
256 cmd.arg("--model").arg(model);
257 }
258
259 permission_to_codex_args(&mut cmd, config.permission_mode);
261
262 cmd.arg("--cd").arg(working_dir);
264
265 cmd.arg(prompt);
267
268 if let Some(key) = api_key {
270 cmd.env("OPENAI_API_KEY", key);
271 }
272 for (k, v) in &config.env {
273 cmd.env(k, v);
274 }
275
276 let mut child = cmd
277 .stdout(std::process::Stdio::piped())
278 .stderr(std::process::Stdio::piped())
279 .spawn()
280 .map_err(|e| {
281 if e.kind() == std::io::ErrorKind::NotFound {
282 AgentError::CliNotFound {
283 cli_name: "codex".to_string(),
284 }
285 } else {
286 AgentError::Io(e)
287 }
288 })?;
289
290 let stdout = child.stdout.take().ok_or_else(|| AgentError::Provider {
291 provider: "codex".into(),
292 message: "failed to capture stdout".into(),
293 })?;
294
295 let stderr_buf: Arc<AsyncMutex<String>> = Arc::new(AsyncMutex::new(String::new()));
298 if let Some(err) = child.stderr.take() {
299 let buf = stderr_buf.clone();
300 tokio::spawn(drain_stderr(err, buf));
301 }
302
303 let mut reader = BufReader::new(stdout);
304 let mut line = String::new();
305 let mut content = String::new();
306 let mut cost = AgentCost::default();
307 let mut thread_id = String::new();
308 let mut had_error = false;
309 let mut error_msg = String::new();
310
311 let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);
312
313 let read_loop = async {
314 loop {
315 line.clear();
316 let bytes = reader.read_line(&mut line).await.map_err(AgentError::Io)?;
317 if bytes == 0 {
318 break;
319 }
320
321 let trimmed = line.trim();
322 if trimmed.is_empty() {
323 continue;
324 }
325
326 match parse_codex_line(trimmed) {
327 Ok(Some(CodexEvent::ThreadStarted { thread_id: tid })) => {
328 thread_id = tid;
329 tracing::debug!(thread_id = %thread_id, "codex thread started");
330 }
331 Ok(Some(CodexEvent::TurnStarted)) => {
332 tracing::debug!("codex turn started");
333 }
334 Ok(Some(CodexEvent::Message(text))) => {
335 if !content.is_empty() {
336 content.push('\n');
337 }
338 content.push_str(&text);
339 }
340 Ok(Some(CodexEvent::TurnCompleted {
341 input_tokens,
342 output_tokens,
343 })) => {
344 cost.input_tokens = input_tokens;
345 cost.output_tokens = output_tokens;
346 }
347 Ok(Some(CodexEvent::Error(msg))) => {
348 had_error = true;
349 error_msg = msg;
350 }
351 Ok(Some(CodexEvent::Other)) => {}
352 Ok(None) => {}
353 Err(e) => {
354 tracing::warn!(error = %e, "failed to parse codex line");
355 }
356 }
357 }
358 Ok::<(), AgentError>(())
359 };
360
361 let result = tokio::time::timeout(timeout, read_loop).await;
362
363 match result {
364 Ok(Ok(())) => {
365 let _ = child.wait().await;
367 }
368 Ok(Err(e)) => {
369 let _ = child.kill().await;
370 return Err(e);
371 }
372 Err(_) => {
373 let _ = child.kill().await;
375 let _ = child.wait().await;
376 let tail = stderr_buf.lock().await.clone();
377 return Err(AgentError::Provider {
378 provider: "codex".into(),
379 message: format!(
380 "timed out after {}s{}",
381 timeout.as_secs(),
382 fmt_stderr_tail(&tail)
383 ),
384 });
385 }
386 }
387
388 if had_error {
389 let tail = stderr_buf.lock().await.clone();
390 return Err(AgentError::Provider {
391 provider: "codex".into(),
392 message: format!("codex error: {error_msg}{}", fmt_stderr_tail(&tail)),
393 });
394 }
395
396 Ok(CodexRunOutput {
397 content,
398 cost,
399 thread_id,
400 })
401}
402
403fn fmt_stderr_tail(tail: &str) -> String {
405 if tail.is_empty() {
406 String::new()
407 } else {
408 format!(" (stderr: {})", tail.trim())
409 }
410}
411
412async fn drain_stderr(
414 stderr: tokio::process::ChildStderr,
415 buf: Arc<AsyncMutex<String>>,
416) {
417 let mut reader = BufReader::new(stderr);
418 let mut chunk = vec![0u8; 1024];
419 loop {
420 match reader.read(&mut chunk).await {
421 Ok(0) => break,
422 Ok(n) => {
423 let s = String::from_utf8_lossy(&chunk[..n]).to_string();
424 let mut guard = buf.lock().await;
425 guard.push_str(&s);
426 let len = guard.len();
427 if len > STDERR_BUFFER_BYTES {
428 let drop_to = len - STDERR_BUFFER_BYTES;
429 let mut idx = drop_to;
430 while idx < len && !guard.is_char_boundary(idx) {
431 idx += 1;
432 }
433 *guard = guard[idx..].to_string();
434 }
435 }
436 Err(_) => break,
437 }
438 }
439}
440
441struct CodexSessionImpl {
443 cost: Arc<Mutex<AgentCost>>,
444 budget: f64,
445 working_dir: PathBuf,
446 config: SpawnConfig,
447 api_key: Option<String>,
448 thread_id: Arc<Mutex<String>>,
450}
451
452#[async_trait]
453impl SessionImpl for CodexSessionImpl {
454 async fn query(&self, prompt: &str) -> Result<AgentResponse> {
455 {
456 let c = self.cost.lock().unwrap();
457 if c.total_usd >= self.budget {
458 return Err(AgentError::BudgetExceeded {
459 limit: self.budget,
460 spent: c.total_usd,
461 });
462 }
463 }
464
465 let resume_id = {
466 let g = self.thread_id.lock().unwrap();
467 if g.is_empty() {
468 None
469 } else {
470 Some(g.clone())
471 }
472 };
473
474 let out = run_codex(
475 &self.working_dir,
476 prompt,
477 &self.config,
478 self.api_key.as_deref(),
479 resume_id.as_deref(),
480 )
481 .await?;
482
483 if !out.thread_id.is_empty() {
485 let mut g = self.thread_id.lock().unwrap();
486 *g = out.thread_id;
487 }
488
489 {
490 let mut c = self.cost.lock().unwrap();
491 c.input_tokens += out.cost.input_tokens;
492 c.output_tokens += out.cost.output_tokens;
493 c.total_usd += out.cost.total_usd;
494 }
495
496 Ok(AgentResponse {
497 content: out.content,
498 cost: out.cost,
499 ..Default::default()
500 })
501 }
502
503 async fn query_stream(&self, prompt: &str) -> Result<EventStream> {
504 {
505 let c = self.cost.lock().unwrap();
506 if c.total_usd >= self.budget {
507 return Err(AgentError::BudgetExceeded {
508 limit: self.budget,
509 spent: c.total_usd,
510 });
511 }
512 }
513 let resume_id = {
514 let g = self.thread_id.lock().unwrap();
515 if g.is_empty() { None } else { Some(g.clone()) }
516 };
517 let working_dir = self.working_dir.clone();
518 let config = self.config.clone();
519 let api_key = self.api_key.clone();
520 let cost_handle = self.cost.clone();
521 let thread_handle = self.thread_id.clone();
522 let budget = self.budget;
523 let prompt_owned = prompt.to_string();
524
525 let (tx, rx) = tokio::sync::mpsc::channel::<Result<MessageEvent>>(64);
526
527 tokio::spawn(async move {
528 stream_codex(
529 &working_dir,
530 &prompt_owned,
531 &config,
532 api_key.as_deref(),
533 resume_id.as_deref(),
534 budget,
535 cost_handle,
536 thread_handle,
537 tx,
538 )
539 .await;
540 });
541
542 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
543 Ok(Box::pin(stream))
544 }
545
546 async fn total_cost(&self) -> Result<AgentCost> {
547 Ok(self.cost.lock().unwrap().clone())
548 }
549
550 async fn close(&self) -> Result<()> {
551 Ok(())
552 }
553}
554
555#[allow(clippy::too_many_arguments)]
557async fn stream_codex(
558 working_dir: &Path,
559 prompt: &str,
560 config: &SpawnConfig,
561 api_key: Option<&str>,
562 resume_thread_id: Option<&str>,
563 budget: f64,
564 cost_handle: Arc<Mutex<AgentCost>>,
565 thread_handle: Arc<Mutex<String>>,
566 tx: tokio::sync::mpsc::Sender<Result<MessageEvent>>,
567) {
568 let mut cmd = Command::new("codex");
569 cmd.current_dir(working_dir);
570 cmd.arg("exec");
571 if let Some(tid) = resume_thread_id {
572 cmd.arg("resume").arg(tid);
573 }
574 cmd.arg("--json");
575 cmd.arg("--skip-git-repo-check");
576 cmd.arg("--color").arg("never");
577 if let Some(model) = &config.model {
578 cmd.arg("--model").arg(model);
579 }
580 permission_to_codex_args(&mut cmd, config.permission_mode);
581 cmd.arg("--cd").arg(working_dir);
582 cmd.arg(prompt);
583 if let Some(key) = api_key {
584 cmd.env("OPENAI_API_KEY", key);
585 }
586 for (k, v) in &config.env {
587 cmd.env(k, v);
588 }
589
590 let mut child = match cmd
591 .stdout(std::process::Stdio::piped())
592 .stderr(std::process::Stdio::piped())
593 .spawn()
594 {
595 Ok(c) => c,
596 Err(e) => {
597 let err = if e.kind() == std::io::ErrorKind::NotFound {
598 AgentError::CliNotFound { cli_name: "codex".into() }
599 } else {
600 AgentError::Io(e)
601 };
602 let _ = tx.send(Err(err)).await;
603 return;
604 }
605 };
606
607 let stdout = match child.stdout.take() {
608 Some(s) => s,
609 None => {
610 let _ = tx
611 .send(Err(AgentError::Provider {
612 provider: "codex".into(),
613 message: "failed to capture stdout".into(),
614 }))
615 .await;
616 return;
617 }
618 };
619
620 let stderr_buf: Arc<AsyncMutex<String>> = Arc::new(AsyncMutex::new(String::new()));
622 if let Some(err) = child.stderr.take() {
623 let buf = stderr_buf.clone();
624 tokio::spawn(drain_stderr(err, buf));
625 }
626
627 let mut reader = BufReader::new(stdout);
628 let mut line = String::new();
629 let mut input_tokens = 0_u64;
630 let mut output_tokens = 0_u64;
631 let mut content = String::new();
632 let mut thread_id_local = String::new();
633 let mut saw_terminal = false;
634 let mut had_error: Option<String> = None;
635
636 let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);
637
638 let result = tokio::time::timeout(timeout, async {
639 loop {
640 line.clear();
641 let n = match reader.read_line(&mut line).await {
642 Ok(n) => n,
643 Err(e) => {
644 let _ = tx.send(Err(AgentError::Io(e))).await;
645 return;
646 }
647 };
648 if n == 0 { break; }
649 let trimmed = line.trim();
650 if trimmed.is_empty() { continue; }
651 match parse_codex_line(trimmed) {
652 Ok(Some(CodexEvent::ThreadStarted { thread_id })) => {
653 thread_id_local = thread_id;
654 }
655 Ok(Some(CodexEvent::TurnStarted)) => {}
656 Ok(Some(CodexEvent::Message(text))) => {
657 if !content.is_empty() { content.push('\n'); }
658 content.push_str(&text);
659 let _ = tx.send(Ok(MessageEvent::TextChunk { text })).await;
660 }
661 Ok(Some(CodexEvent::TurnCompleted { input_tokens: i, output_tokens: o })) => {
662 input_tokens = i;
663 output_tokens = o;
664 }
665 Ok(Some(CodexEvent::Error(msg))) => {
666 had_error = Some(msg);
667 }
668 Ok(Some(CodexEvent::Other)) | Ok(None) => {}
669 Err(_) => {}
670 }
671 }
672 }).await;
673
674 if result.is_err() {
675 let _ = child.kill().await;
676 let _ = child.wait().await;
677 let tail = stderr_buf.lock().await.clone();
678 let _ = tx
679 .send(Err(AgentError::Provider {
680 provider: "codex".into(),
681 message: format!("stream timed out after {}s{}", timeout.as_secs(), fmt_stderr_tail(&tail)),
682 }))
683 .await;
684 return;
685 } else {
686 let _ = child.wait().await;
687 }
688
689 if let Some(msg) = had_error {
690 let tail = stderr_buf.lock().await.clone();
691 let _ = tx
692 .send(Err(AgentError::Provider {
693 provider: "codex".into(),
694 message: format!("codex error: {msg}{}", fmt_stderr_tail(&tail)),
695 }))
696 .await;
697 return;
698 }
699
700 let cost = AgentCost {
701 input_tokens,
702 output_tokens,
703 cache_read_tokens: 0,
704 cache_creation_tokens: 0,
705 total_usd: 0.0,
706 };
707 {
708 let mut c = cost_handle.lock().unwrap();
709 c.input_tokens += cost.input_tokens;
710 c.output_tokens += cost.output_tokens;
711 c.total_usd += cost.total_usd;
712 }
713 if !thread_id_local.is_empty() {
714 let mut g = thread_handle.lock().unwrap();
715 *g = thread_id_local;
716 }
717 if cost.total_usd > budget {
718 let _ = tx.send(Err(AgentError::BudgetExceeded { limit: budget, spent: cost.total_usd })).await;
719 return;
720 }
721 let _ = tx
722 .send(Ok(MessageEvent::ResultDone { cost, content, is_error: false }))
723 .await;
724 saw_terminal = true;
725 let _ = saw_terminal;
726}
727
728#[async_trait]
729impl AgentExecutor for CodexExecutor {
730 fn executor_type(&self) -> ExecutorType {
731 ExecutorType::Codex
732 }
733
734 async fn spawn(
735 &self,
736 working_dir: &Path,
737 prompt: &str,
738 config: &SpawnConfig,
739 ) -> Result<AgentSession> {
740 let cost = Arc::new(Mutex::new(AgentCost::default()));
741 let budget = config.budget_usd.unwrap_or(f64::MAX);
742
743 if budget <= 0.0 {
744 return Err(AgentError::BudgetExceeded {
745 limit: budget,
746 spent: 0.0,
747 });
748 }
749
750 let out = run_codex(working_dir, prompt, config, self.api_key.as_deref(), None).await?;
751
752 if out.cost.total_usd > budget {
753 return Err(AgentError::BudgetExceeded {
754 limit: budget,
755 spent: out.cost.total_usd,
756 });
757 }
758
759 let session_id = if out.thread_id.is_empty() {
763 Uuid::new_v4().to_string()
764 } else {
765 out.thread_id.clone()
766 };
767
768 {
769 let mut c = cost.lock().unwrap();
770 *c = out.cost.clone();
771 }
772
773 let inner = Arc::new(CodexSessionImpl {
774 cost: cost.clone(),
775 budget,
776 working_dir: working_dir.to_path_buf(),
777 config: config.clone(),
778 api_key: self.api_key.clone(),
779 thread_id: Arc::new(Mutex::new(out.thread_id.clone())),
780 });
781
782 Ok(AgentSession::new(
783 session_id,
784 ExecutorType::Codex,
785 working_dir.to_path_buf(),
786 config.model.clone(),
787 inner,
788 ))
789 }
790
791 async fn resume(
792 &self,
793 working_dir: &Path,
794 session_id: &str,
795 prompt: &str,
796 config: &SpawnConfig,
797 ) -> Result<AgentSession> {
798 let cost = Arc::new(Mutex::new(AgentCost::default()));
799 let budget = config.budget_usd.unwrap_or(f64::MAX);
800
801 if budget <= 0.0 {
802 return Err(AgentError::BudgetExceeded {
803 limit: budget,
804 spent: 0.0,
805 });
806 }
807
808 let out = run_codex(
810 working_dir,
811 prompt,
812 config,
813 self.api_key.as_deref(),
814 Some(session_id),
815 )
816 .await?;
817
818 if out.cost.total_usd > budget {
819 return Err(AgentError::BudgetExceeded {
820 limit: budget,
821 spent: out.cost.total_usd,
822 });
823 }
824
825 let resolved_thread_id = if out.thread_id.is_empty() {
827 session_id.to_string()
828 } else {
829 out.thread_id.clone()
830 };
831
832 {
833 let mut c = cost.lock().unwrap();
834 *c = out.cost.clone();
835 }
836
837 let inner = Arc::new(CodexSessionImpl {
838 cost: cost.clone(),
839 budget,
840 working_dir: working_dir.to_path_buf(),
841 config: config.clone(),
842 api_key: self.api_key.clone(),
843 thread_id: Arc::new(Mutex::new(resolved_thread_id.clone())),
844 });
845
846 Ok(AgentSession::new(
847 resolved_thread_id,
848 ExecutorType::Codex,
849 working_dir.to_path_buf(),
850 config.model.clone(),
851 inner,
852 ))
853 }
854
855 fn capabilities(&self) -> AgentCapabilities {
856 AgentCapabilities {
857 session_resume: true,
859 token_usage: true,
860 mcp_support: false,
861 autonomous_mode: true,
862 structured_output: false,
865 streaming: true,
866 hooks: false,
867 prompt_caching: false,
868 extended_thinking: false,
869 }
870 }
871
872 fn availability(&self) -> AvailabilityStatus {
873 if Self::check_cli_available() {
874 AvailabilityStatus {
875 available: true,
876 reason: None,
877 }
878 } else {
879 AvailabilityStatus {
880 available: false,
881 reason: Some(
882 "`codex` CLI not found. Install: npm install -g @openai/codex".to_string(),
883 ),
884 }
885 }
886 }
887}
888
889#[allow(dead_code)]
891fn _child_type_check(c: &Child) -> Option<u32> {
892 c.id()
893}
894
895#[cfg(test)]
896mod tests {
897 use super::*;
898
899 #[test]
900 fn executor_type_is_codex() {
901 let exec = CodexExecutor::new();
902 assert_eq!(exec.executor_type(), ExecutorType::Codex);
903 }
904
905 #[test]
906 fn capabilities_after_resume_landing() {
907 let caps = CodexExecutor::new().capabilities();
908 assert!(caps.autonomous_mode);
909 assert!(caps.token_usage);
910 assert!(!caps.mcp_support);
911 assert!(caps.session_resume, "Codex resume implemented via `codex exec resume`");
912 assert!(!caps.structured_output, "structured output not yet wired");
913 }
914
915 #[test]
916 fn parse_codex_thread_started() {
917 let line =
918 r#"{"type":"thread.started","thread_id":"019ce6ce-65fd-7530-8e6b-9ccce0436091"}"#;
919 let event = parse_codex_line(line).unwrap();
920 match event {
921 Some(CodexEvent::ThreadStarted { thread_id }) => {
922 assert_eq!(thread_id, "019ce6ce-65fd-7530-8e6b-9ccce0436091");
923 }
924 _ => panic!("expected ThreadStarted"),
925 }
926 }
927
928 #[test]
929 fn parse_codex_turn_started() {
930 let line = r#"{"type":"turn.started"}"#;
931 let event = parse_codex_line(line).unwrap();
932 assert!(matches!(event, Some(CodexEvent::TurnStarted)));
933 }
934
935 #[test]
936 fn parse_codex_message_event() {
937 let line = r#"{"type":"item.completed","item":{"id":"item_0","type":"agent_message","text":"Fixed the bug"}}"#;
938 let event = parse_codex_line(line).unwrap();
939 match event {
940 Some(CodexEvent::Message(text)) => assert_eq!(text, "Fixed the bug"),
941 _ => panic!("expected Message"),
942 }
943 }
944
945 #[test]
946 fn parse_codex_turn_completed_canonical_usage_key() {
947 let line =
949 r#"{"type":"turn.completed","usage":{"input_tokens":100,"output_tokens":50}}"#;
950 let event = parse_codex_line(line).unwrap();
951 match event {
952 Some(CodexEvent::TurnCompleted {
953 input_tokens,
954 output_tokens,
955 }) => {
956 assert_eq!(input_tokens, 100);
957 assert_eq!(output_tokens, 50);
958 }
959 _ => panic!("expected TurnCompleted"),
960 }
961 }
962
963 #[test]
964 fn parse_codex_turn_completed_legacy_token_usage_fallback() {
965 let line = r#"{"type":"turn.completed","token_usage":{"input_tokens":7,"output_tokens":11}}"#;
966 let event = parse_codex_line(line).unwrap();
967 match event {
968 Some(CodexEvent::TurnCompleted {
969 input_tokens,
970 output_tokens,
971 }) => {
972 assert_eq!(input_tokens, 7);
973 assert_eq!(output_tokens, 11);
974 }
975 _ => panic!("expected TurnCompleted"),
976 }
977 }
978
979 #[test]
980 fn parse_codex_turn_completed_prefers_usage_over_token_usage() {
981 let line = r#"{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":2},"token_usage":{"input_tokens":99,"output_tokens":99}}"#;
983 let event = parse_codex_line(line).unwrap();
984 match event {
985 Some(CodexEvent::TurnCompleted {
986 input_tokens,
987 output_tokens,
988 }) => {
989 assert_eq!(input_tokens, 1);
990 assert_eq!(output_tokens, 2);
991 }
992 _ => panic!("expected TurnCompleted"),
993 }
994 }
995
996 #[test]
997 fn parse_codex_error() {
998 let line = r#"{"type":"error","message":"Quota exceeded"}"#;
999 let event = parse_codex_line(line).unwrap();
1000 match event {
1001 Some(CodexEvent::Error(msg)) => assert!(msg.contains("Quota")),
1002 _ => panic!("expected Error"),
1003 }
1004 }
1005
1006 #[test]
1007 fn parse_codex_turn_failed() {
1008 let line = r#"{"type":"turn.failed","error":{"message":"Quota exceeded. Check your plan."}}"#;
1009 let event = parse_codex_line(line).unwrap();
1010 match event {
1011 Some(CodexEvent::Error(msg)) => assert!(msg.contains("Quota")),
1012 _ => panic!("expected Error"),
1013 }
1014 }
1015
1016 #[test]
1017 fn parse_unknown_type_returns_other() {
1018 let line = r#"{"type":"web_search","query":"test"}"#;
1019 let event = parse_codex_line(line).unwrap();
1020 assert!(matches!(event, Some(CodexEvent::Other)));
1021 }
1022
1023 #[test]
1024 fn permission_accept_edits_no_longer_uses_full_auto() {
1025 let mut cmd = Command::new("codex");
1027 permission_to_codex_args(&mut cmd, Some(PermissionMode::AcceptEdits));
1028 let dbg = format!("{:?}", cmd.as_std());
1029 assert!(
1030 !dbg.contains("--full-auto"),
1031 "should not pass --full-auto: {dbg}"
1032 );
1033 assert!(
1034 dbg.contains("--sandbox") && dbg.contains("workspace-write"),
1035 "should pass --sandbox workspace-write: {dbg}"
1036 );
1037 }
1038
1039 #[test]
1040 fn permission_dont_ask_maps_to_read_only_sandbox() {
1041 let mut cmd = Command::new("codex");
1042 permission_to_codex_args(&mut cmd, Some(PermissionMode::DontAsk));
1043 let dbg = format!("{:?}", cmd.as_std());
1044 assert!(dbg.contains("--sandbox") && dbg.contains("read-only"), "{dbg}");
1045 }
1046
1047 #[test]
1048 fn permission_bypass_uses_dangerous_flag() {
1049 let mut cmd = Command::new("codex");
1050 permission_to_codex_args(&mut cmd, Some(PermissionMode::BypassPermissions));
1051 let dbg = format!("{:?}", cmd.as_std());
1052 assert!(
1053 dbg.contains("--dangerously-bypass-approvals-and-sandbox"),
1054 "{dbg}"
1055 );
1056 }
1057}