1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::collections::HashSet;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::sync::Mutex;
7use std::sync::MutexGuard;
8use std::sync::atomic::AtomicU64;
9use std::time::Duration;
10
11use agcodex_apply_patch::ApplyPatchAction;
12use agcodex_apply_patch::MaybeApplyPatchVerified;
13use agcodex_apply_patch::maybe_parse_apply_patch_verified;
14use agcodex_login::CodexAuth;
15use agcodex_mcp_types::CallToolResult;
16use agcodex_protocol::protocol::TurnAbortReason;
17use agcodex_protocol::protocol::TurnAbortedEvent;
18use async_channel::Receiver;
19use async_channel::Sender;
20use futures::prelude::*;
21use serde::Serialize;
22use serde_json;
23use tokio::sync::oneshot;
24use tokio::task::AbortHandle;
25use tracing::debug;
26use tracing::error;
27use tracing::info;
28use tracing::trace;
29use tracing::warn;
30use uuid::Uuid;
31
32use crate::ModelProviderInfo;
33use crate::apply_patch;
34use crate::apply_patch::ApplyPatchExec;
35use crate::apply_patch::CODEX_APPLY_PATCH_ARG1;
36use crate::apply_patch::InternalApplyPatchInvocation;
37use crate::apply_patch::convert_apply_patch_to_protocol;
38use crate::client::ModelClient;
39use crate::client_common::Prompt;
40use crate::client_common::ResponseEvent;
41use crate::config::Config;
42use crate::config_types::ShellEnvironmentPolicy;
43use crate::conversation_history::ConversationHistory;
44use crate::environment_context::EnvironmentContext;
45use crate::error::CodexErr;
46use crate::error::Result as CodexResult;
47use crate::error::Result;
48use crate::error::SandboxErr;
49use crate::error::get_error_message_ui;
50use crate::exec::ExecParams;
51use crate::exec::ExecToolCallOutput;
52use crate::exec::SandboxType;
53use crate::exec::StdoutStream;
54use crate::exec::StreamOutput;
55use crate::exec::process_exec_tool_call;
56use crate::exec_env::create_env;
57use crate::mcp_connection_manager::McpConnectionManager;
58use crate::mcp_tool_call::handle_mcp_tool_call;
59use crate::model_family::find_family_for_model;
60use crate::models::ContentItem;
61use crate::models::FunctionCallOutputPayload;
62use crate::models::LocalShellAction;
63use crate::models::ReasoningItemContent;
64use crate::models::ReasoningItemReasoningSummary;
65use crate::models::ResponseInputItem;
66use crate::models::ResponseItem;
67use crate::models::ShellToolCallParams;
68use crate::openai_tools::ApplyPatchToolArgs;
69use crate::openai_tools::ToolsConfig;
70use crate::openai_tools::get_openai_tools;
71use crate::parse_command::parse_command;
72use crate::plan_tool::handle_update_plan;
73use crate::project_doc::get_user_instructions;
74use crate::protocol::AgentMessageDeltaEvent;
75use crate::protocol::AgentMessageEvent;
76use crate::protocol::AgentReasoningDeltaEvent;
77use crate::protocol::AgentReasoningEvent;
78use crate::protocol::AgentReasoningRawContentDeltaEvent;
79use crate::protocol::AgentReasoningRawContentEvent;
80use crate::protocol::AgentReasoningSectionBreakEvent;
81use crate::protocol::ApplyPatchApprovalRequestEvent;
82use crate::protocol::AskForApproval;
83use crate::protocol::BackgroundEventEvent;
84use crate::protocol::ErrorEvent;
85use crate::protocol::Event;
86use crate::protocol::EventMsg;
87use crate::protocol::ExecApprovalRequestEvent;
88use crate::protocol::ExecCommandBeginEvent;
89use crate::protocol::ExecCommandEndEvent;
90use crate::protocol::FileChange;
91use crate::protocol::InputItem;
92use crate::protocol::Op;
93use crate::protocol::PatchApplyBeginEvent;
94use crate::protocol::PatchApplyEndEvent;
95use crate::protocol::ReviewDecision;
96use crate::protocol::SandboxPolicy;
97use crate::protocol::SessionConfiguredEvent;
98use crate::protocol::Submission;
99use crate::protocol::TaskCompleteEvent;
100use crate::protocol::TurnDiffEvent;
101use crate::rollout::RolloutRecorder;
102use crate::safety::SafetyCheck;
103use crate::safety::assess_command_safety;
104use crate::safety::assess_safety_for_untrusted_command;
105use crate::shell;
106use crate::turn_diff_tracker::TurnDiffTracker;
107use crate::user_notification::UserNotification;
108use crate::util::backoff;
109use agcodex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
110use agcodex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
111
112trait MutexExt<T> {
117 fn lock_unchecked(&self) -> MutexGuard<'_, T>;
118}
119
120impl<T> MutexExt<T> for Mutex<T> {
121 fn lock_unchecked(&self) -> MutexGuard<'_, T> {
122 #[expect(clippy::expect_used)]
123 self.lock().expect("poisoned lock")
124 }
125}
126
127pub struct Codex {
130 next_id: AtomicU64,
131 tx_sub: Sender<Submission>,
132 rx_event: Receiver<Event>,
133}
134
135pub struct CodexSpawnOk {
139 pub codex: Codex,
140 pub session_id: Uuid,
141}
142
143pub(crate) const INITIAL_SUBMIT_ID: &str = "";
144
145impl Codex {
146 pub async fn spawn(config: Config, auth: Option<CodexAuth>) -> CodexResult<CodexSpawnOk> {
148 let (tx_sub, rx_sub) = async_channel::bounded(64);
149 let (tx_event, rx_event) = async_channel::unbounded();
150
151 let user_instructions = get_user_instructions(&config).await;
152
153 let config = Arc::new(config);
154 let resume_path = config.experimental_resume.clone();
155
156 let configure_session = ConfigureSession {
157 provider: config.model_provider.clone(),
158 model: config.model.clone(),
159 model_reasoning_effort: config.model_reasoning_effort,
160 model_reasoning_summary: config.model_reasoning_summary,
161 user_instructions,
162 base_instructions: config.base_instructions.clone(),
163 approval_policy: config.approval_policy,
164 sandbox_policy: config.sandbox_policy.clone(),
165 disable_response_storage: config.disable_response_storage,
166 notify: config.notify.clone(),
167 cwd: config.cwd.clone(),
168 resume_path,
169 };
170
171 let (session, turn_context) =
173 Session::new(configure_session, config.clone(), auth, tx_event.clone())
174 .await
175 .map_err(|e| {
176 error!("Failed to create session: {e:#}");
177 CodexErr::InternalAgentDied
178 })?;
179 let session_id = session.session_id;
180
181 tokio::spawn(submission_loop(session, turn_context, config, rx_sub));
183 let codex = Codex {
184 next_id: AtomicU64::new(0),
185 tx_sub,
186 rx_event,
187 };
188
189 Ok(CodexSpawnOk { codex, session_id })
190 }
191
192 pub async fn submit(&self, op: Op) -> CodexResult<String> {
194 let id = self
195 .next_id
196 .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
197 .to_string();
198 let sub = Submission { id: id.clone(), op };
199 self.submit_with_id(sub).await?;
200 Ok(id)
201 }
202
203 pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
206 self.tx_sub
207 .send(sub)
208 .await
209 .map_err(|_| CodexErr::InternalAgentDied)?;
210 Ok(())
211 }
212
213 pub async fn next_event(&self) -> CodexResult<Event> {
214 let event = self
215 .rx_event
216 .recv()
217 .await
218 .map_err(|_| CodexErr::InternalAgentDied)?;
219 Ok(event)
220 }
221}
222
223#[derive(Default)]
225struct State {
226 approved_commands: HashSet<Vec<String>>,
227 current_task: Option<AgentTask>,
228 pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
229 pending_input: Vec<ResponseInputItem>,
230 history: ConversationHistory,
231}
232
233pub(crate) struct Session {
237 session_id: Uuid,
238 tx_event: Sender<Event>,
239
240 mcp_connection_manager: McpConnectionManager,
242
243 notify: Option<Vec<String>>,
246
247 rollout: Mutex<Option<RolloutRecorder>>,
250 state: Mutex<State>,
251 codex_linux_sandbox_exe: Option<PathBuf>,
252 user_shell: shell::Shell,
253 show_raw_agent_reasoning: bool,
254}
255
256#[derive(Debug)]
258pub(crate) struct TurnContext {
259 pub(crate) client: ModelClient,
260 pub(crate) cwd: PathBuf,
264 pub(crate) base_instructions: Option<String>,
265 pub(crate) user_instructions: Option<String>,
266 pub(crate) approval_policy: AskForApproval,
267 pub(crate) sandbox_policy: SandboxPolicy,
268 pub(crate) shell_environment_policy: ShellEnvironmentPolicy,
269 pub(crate) disable_response_storage: bool,
270 pub(crate) tools_config: ToolsConfig,
271}
272
273impl TurnContext {
274 fn resolve_path(&self, path: Option<String>) -> PathBuf {
275 path.as_ref()
276 .map(PathBuf::from)
277 .map_or_else(|| self.cwd.clone(), |p| self.cwd.join(p))
278 }
279}
280
281struct ConfigureSession {
283 provider: ModelProviderInfo,
285
286 model: String,
288
289 model_reasoning_effort: ReasoningEffortConfig,
290 model_reasoning_summary: ReasoningSummaryConfig,
291
292 user_instructions: Option<String>,
294
295 base_instructions: Option<String>,
297
298 approval_policy: AskForApproval,
300 sandbox_policy: SandboxPolicy,
302 disable_response_storage: bool,
304
305 notify: Option<Vec<String>>,
309
310 cwd: PathBuf,
318
319 resume_path: Option<PathBuf>,
320}
321
322impl Session {
323 async fn new(
324 configure_session: ConfigureSession,
325 config: Arc<Config>,
326 auth: Option<CodexAuth>,
327 tx_event: Sender<Event>,
328 ) -> Result<(Arc<Self>, TurnContext)> {
329 let ConfigureSession {
330 provider,
331 model,
332 model_reasoning_effort,
333 model_reasoning_summary,
334 user_instructions,
335 base_instructions,
336 approval_policy,
337 sandbox_policy,
338 disable_response_storage,
339 notify,
340 cwd,
341 resume_path,
342 } = configure_session;
343 debug!("Configuring session: model={model}; provider={provider:?}");
344 if !cwd.is_absolute() {
345 return Err(CodexErr::InvalidWorkingDirectory(format!(
346 "cwd is not absolute: {cwd:?}"
347 )));
348 }
349
350 let mut post_session_configured_error_events = Vec::<Event>::new();
352
353 let rollout_fut = async {
360 match resume_path.as_ref() {
361 Some(path) => RolloutRecorder::resume(path, cwd.clone())
362 .await
363 .map(|(rec, saved)| (saved.session_id, Some(saved), rec)),
364 None => {
365 let session_id = Uuid::new_v4();
366 RolloutRecorder::new(&config, session_id, user_instructions.clone())
367 .await
368 .map(|rec| (session_id, None, rec))
369 }
370 }
371 };
372
373 let mcp_fut = McpConnectionManager::new(config.mcp_servers.clone());
374 let default_shell_fut = shell::default_user_shell();
375 let history_meta_fut = crate::message_history::history_metadata(&config);
376
377 let (rollout_res, mcp_res, default_shell, (history_log_id, history_entry_count)) =
379 tokio::join!(rollout_fut, mcp_fut, default_shell_fut, history_meta_fut);
380
381 struct RolloutResult {
383 session_id: Uuid,
384 rollout_recorder: Option<RolloutRecorder>,
385 restored_items: Option<Vec<ResponseItem>>,
386 }
387 let rollout_result = match rollout_res {
388 Ok((session_id, maybe_saved, recorder)) => {
389 let restored_items: Option<Vec<ResponseItem>> =
390 maybe_saved.and_then(|saved_session| {
391 if saved_session.items.is_empty() {
392 None
393 } else {
394 Some(saved_session.items)
395 }
396 });
397 RolloutResult {
398 session_id,
399 rollout_recorder: Some(recorder),
400 restored_items,
401 }
402 }
403 Err(e) => {
404 if let Some(path) = resume_path.as_ref() {
405 return Err(CodexErr::McpServer(format!(
406 "failed to resume rollout from {path:?}: {e}"
407 )));
408 }
409
410 let message = format!("failed to initialize rollout recorder: {e}");
411 post_session_configured_error_events.push(Event {
412 id: INITIAL_SUBMIT_ID.to_owned(),
413 msg: EventMsg::Error(ErrorEvent {
414 message: message.clone(),
415 }),
416 });
417 warn!("{message}");
418
419 RolloutResult {
420 session_id: Uuid::new_v4(),
421 rollout_recorder: None,
422 restored_items: None,
423 }
424 }
425 };
426
427 let RolloutResult {
428 session_id,
429 rollout_recorder,
430 restored_items,
431 } = rollout_result;
432
433 let mut state = State {
435 history: ConversationHistory::new(),
436 ..Default::default()
437 };
438 if let Some(restored_items) = restored_items {
439 state.history.record_items(&restored_items);
440 }
441
442 let (mcp_connection_manager, failed_clients) = match mcp_res {
444 Ok((mgr, failures)) => (mgr, failures),
445 Err(e) => {
446 let message = format!("Failed to create MCP connection manager: {e:#}");
447 error!("{message}");
448 post_session_configured_error_events.push(Event {
449 id: INITIAL_SUBMIT_ID.to_owned(),
450 msg: EventMsg::Error(ErrorEvent { message }),
451 });
452 (McpConnectionManager::default(), Default::default())
453 }
454 };
455
456 if !failed_clients.is_empty() {
458 for (server_name, err) in failed_clients {
459 let message = format!("MCP client for `{server_name}` failed to start: {err:#}");
460 error!("{message}");
461 post_session_configured_error_events.push(Event {
462 id: INITIAL_SUBMIT_ID.to_owned(),
463 msg: EventMsg::Error(ErrorEvent { message }),
464 });
465 }
466 }
467
468 let client = ModelClient::new(
471 config.clone(),
472 auth.clone(),
473 provider.clone(),
474 model_reasoning_effort,
475 model_reasoning_summary,
476 session_id,
477 );
478 let turn_context = TurnContext {
479 client,
480 tools_config: ToolsConfig::new(
481 &config.model_family,
482 approval_policy,
483 sandbox_policy.clone(),
484 config.include_plan_tool,
485 config.include_apply_patch_tool,
486 ),
487 user_instructions,
488 base_instructions,
489 approval_policy,
490 sandbox_policy,
491 shell_environment_policy: config.shell_environment_policy.clone(),
492 cwd,
493 disable_response_storage,
494 };
495 let sess = Arc::new(Session {
496 session_id,
497 tx_event: tx_event.clone(),
498 mcp_connection_manager,
499 notify,
500 state: Mutex::new(state),
501 rollout: Mutex::new(rollout_recorder),
502 codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
503 user_shell: default_shell,
504 show_raw_agent_reasoning: config.show_raw_agent_reasoning,
505 });
506
507 let mut conversation_items = Vec::<ResponseItem>::with_capacity(2);
510 if let Some(user_instructions) = turn_context.user_instructions.as_deref() {
511 conversation_items.push(Prompt::format_user_instructions_message(user_instructions));
512 }
513 conversation_items.push(ResponseItem::from(EnvironmentContext::new(
514 turn_context.cwd.to_path_buf(),
515 turn_context.approval_policy,
516 turn_context.sandbox_policy.clone(),
517 )));
518 sess.record_conversation_items(&conversation_items).await;
519
520 let events = std::iter::once(Event {
522 id: INITIAL_SUBMIT_ID.to_owned(),
523 msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
524 session_id,
525 model,
526 history_log_id,
527 history_entry_count,
528 }),
529 })
530 .chain(post_session_configured_error_events.into_iter());
531 for event in events {
532 if let Err(e) = tx_event.send(event).await {
533 error!("failed to send event: {e:?}");
534 }
535 }
536
537 Ok((sess, turn_context))
538 }
539
540 pub fn set_task(&self, task: AgentTask) {
541 let mut state = self.state.lock_unchecked();
542 if let Some(current_task) = state.current_task.take() {
543 current_task.abort(TurnAbortReason::Replaced);
544 }
545 state.current_task = Some(task);
546 }
547
548 pub fn remove_task(&self, sub_id: &str) {
549 let mut state = self.state.lock_unchecked();
550 if let Some(task) = &state.current_task
551 && task.sub_id == sub_id
552 {
553 state.current_task.take();
554 }
555 }
556
557 pub(crate) async fn send_event(&self, event: Event) {
560 if let Err(e) = self.tx_event.send(event).await {
561 error!("failed to send tool call event: {e}");
562 }
563 }
564
565 pub async fn request_command_approval(
566 &self,
567 sub_id: String,
568 call_id: String,
569 command: Vec<String>,
570 cwd: PathBuf,
571 reason: Option<String>,
572 ) -> oneshot::Receiver<ReviewDecision> {
573 let (tx_approve, rx_approve) = oneshot::channel();
574 let event = Event {
575 id: sub_id.clone(),
576 msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
577 call_id,
578 command,
579 cwd,
580 reason,
581 }),
582 };
583 let _ = self.tx_event.send(event).await;
584 {
585 let mut state = self.state.lock_unchecked();
586 state.pending_approvals.insert(sub_id, tx_approve);
587 }
588 rx_approve
589 }
590
591 pub async fn request_patch_approval(
592 &self,
593 sub_id: String,
594 call_id: String,
595 action: &ApplyPatchAction,
596 reason: Option<String>,
597 grant_root: Option<PathBuf>,
598 ) -> oneshot::Receiver<ReviewDecision> {
599 let (tx_approve, rx_approve) = oneshot::channel();
600 let event = Event {
601 id: sub_id.clone(),
602 msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
603 call_id,
604 changes: convert_apply_patch_to_protocol(action),
605 reason,
606 grant_root,
607 }),
608 };
609 let _ = self.tx_event.send(event).await;
610 {
611 let mut state = self.state.lock_unchecked();
612 state.pending_approvals.insert(sub_id, tx_approve);
613 }
614 rx_approve
615 }
616
617 pub fn notify_approval(&self, sub_id: &str, decision: ReviewDecision) {
618 let mut state = self.state.lock_unchecked();
619 if let Some(tx_approve) = state.pending_approvals.remove(sub_id) {
620 tx_approve.send(decision).ok();
621 }
622 }
623
624 pub fn add_approved_command(&self, cmd: Vec<String>) {
625 let mut state = self.state.lock_unchecked();
626 state.approved_commands.insert(cmd);
627 }
628
629 async fn record_conversation_items(&self, items: &[ResponseItem]) {
632 debug!("Recording items for conversation: {items:?}");
633 self.record_state_snapshot(items).await;
634
635 self.state.lock_unchecked().history.record_items(items);
636 }
637
638 async fn record_state_snapshot(&self, items: &[ResponseItem]) {
639 let snapshot = { crate::rollout::SessionStateSnapshot {} };
640
641 let recorder = {
642 let guard = self.rollout.lock_unchecked();
643 guard.as_ref().cloned()
644 };
645
646 if let Some(rec) = recorder {
647 if let Err(e) = rec.record_state(snapshot).await {
648 error!("failed to record rollout state: {e:#}");
649 }
650 if let Err(e) = rec.record_items(items).await {
651 error!("failed to record rollout items: {e:#}");
652 }
653 }
654 }
655
656 async fn on_exec_command_begin(
657 &self,
658 turn_diff_tracker: &mut TurnDiffTracker,
659 exec_command_context: ExecCommandContext,
660 ) {
661 let ExecCommandContext {
662 sub_id,
663 call_id,
664 command_for_display,
665 cwd,
666 apply_patch,
667 } = exec_command_context;
668 let msg = match apply_patch {
669 Some(ApplyPatchCommandContext {
670 user_explicitly_approved_this_action,
671 changes,
672 }) => {
673 turn_diff_tracker.on_patch_begin(&changes);
674
675 EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
676 call_id,
677 auto_approved: !user_explicitly_approved_this_action,
678 changes,
679 })
680 }
681 None => EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
682 call_id,
683 command: command_for_display.clone(),
684 cwd,
685 parsed_cmd: parse_command(&command_for_display)
686 .into_iter()
687 .map(Into::into)
688 .collect(),
689 }),
690 };
691 let event = Event {
692 id: sub_id.to_string(),
693 msg,
694 };
695 let _ = self.tx_event.send(event).await;
696 }
697
698 #[allow(clippy::too_many_arguments)]
699 async fn on_exec_command_end(
700 &self,
701 turn_diff_tracker: &mut TurnDiffTracker,
702 sub_id: &str,
703 call_id: &str,
704 output: &ExecToolCallOutput,
705 is_apply_patch: bool,
706 ) {
707 let ExecToolCallOutput {
708 stdout,
709 stderr,
710 duration,
711 exit_code,
712 } = output;
713 const MAX_STREAM_OUTPUT: usize = 5 * 1024; let stdout = stdout.text.chars().take(MAX_STREAM_OUTPUT).collect();
717 let stderr = stderr.text.chars().take(MAX_STREAM_OUTPUT).collect();
718
719 let msg = if is_apply_patch {
720 EventMsg::PatchApplyEnd(PatchApplyEndEvent {
721 call_id: call_id.to_string(),
722 stdout,
723 stderr,
724 success: *exit_code == 0,
725 })
726 } else {
727 EventMsg::ExecCommandEnd(ExecCommandEndEvent {
728 call_id: call_id.to_string(),
729 stdout,
730 stderr,
731 duration: *duration,
732 exit_code: *exit_code,
733 })
734 };
735
736 let event = Event {
737 id: sub_id.to_string(),
738 msg,
739 };
740 let _ = self.tx_event.send(event).await;
741
742 if is_apply_patch {
745 let unified_diff = turn_diff_tracker.get_unified_diff();
746 if let Ok(Some(unified_diff)) = unified_diff {
747 let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
748 let event = Event {
749 id: sub_id.into(),
750 msg,
751 };
752 let _ = self.tx_event.send(event).await;
753 }
754 }
755 }
756 async fn run_exec_with_events<'a>(
761 &self,
762 turn_diff_tracker: &mut TurnDiffTracker,
763 begin_ctx: ExecCommandContext,
764 exec_args: ExecInvokeArgs<'a>,
765 ) -> crate::error::Result<ExecToolCallOutput> {
766 let is_apply_patch = begin_ctx.apply_patch.is_some();
767 let sub_id = begin_ctx.sub_id.clone();
768 let call_id = begin_ctx.call_id.clone();
769
770 self.on_exec_command_begin(turn_diff_tracker, begin_ctx.clone())
771 .await;
772
773 let default_mode_restrictions = crate::modes::ModeRestrictions {
776 allow_file_write: true,
777 allow_command_exec: true,
778 allow_network_access: true,
779 allow_git_operations: true,
780 max_file_size: None,
781 };
782
783 let result = process_exec_tool_call(
784 exec_args.params,
785 exec_args.sandbox_type,
786 exec_args.sandbox_policy,
787 exec_args.codex_linux_sandbox_exe,
788 exec_args.stdout_stream,
789 &default_mode_restrictions,
790 )
791 .await;
792
793 let output_stderr;
794 let borrowed: &ExecToolCallOutput = match &result {
795 Ok(output) => output,
796 Err(e) => {
797 output_stderr = ExecToolCallOutput {
798 exit_code: -1,
799 stdout: StreamOutput::new(String::new()),
800 stderr: StreamOutput::new(get_error_message_ui(e)),
801 duration: Duration::default(),
802 };
803 &output_stderr
804 }
805 };
806 self.on_exec_command_end(
807 turn_diff_tracker,
808 &sub_id,
809 &call_id,
810 borrowed,
811 is_apply_patch,
812 )
813 .await;
814
815 result
816 }
817
818 async fn notify_background_event(&self, sub_id: &str, message: impl Into<String>) {
822 let event = Event {
823 id: sub_id.to_string(),
824 msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
825 message: message.into(),
826 }),
827 };
828 let _ = self.tx_event.send(event).await;
829 }
830
831 pub fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
834 [self.state.lock_unchecked().history.contents(), extra].concat()
835 }
836
837 pub fn inject_input(&self, input: Vec<InputItem>) -> std::result::Result<(), Vec<InputItem>> {
839 let mut state = self.state.lock_unchecked();
840 if state.current_task.is_some() {
841 state.pending_input.push(input.into());
842 Ok(())
843 } else {
844 Err(input)
845 }
846 }
847
848 pub fn get_pending_input(&self) -> Vec<ResponseInputItem> {
849 let mut state = self.state.lock_unchecked();
850 if state.pending_input.is_empty() {
851 Vec::with_capacity(0)
852 } else {
853 let mut ret = Vec::new();
854 std::mem::swap(&mut ret, &mut state.pending_input);
855 ret
856 }
857 }
858
859 pub async fn call_tool(
860 &self,
861 server: &str,
862 tool: &str,
863 arguments: Option<serde_json::Value>,
864 timeout: Option<Duration>,
865 ) -> Result<CallToolResult> {
866 self.mcp_connection_manager
867 .call_tool(server, tool, arguments, timeout)
868 .await
869 }
870
871 fn interrupt_task(&self) {
872 info!("interrupt received: abort current task, if any");
873 let mut state = self.state.lock_unchecked();
874 state.pending_approvals.clear();
875 state.pending_input.clear();
876 if let Some(task) = state.current_task.take() {
877 task.abort(TurnAbortReason::Interrupted);
878 }
879 }
880
881 fn maybe_notify(&self, notification: UserNotification) {
885 let Some(notify_command) = &self.notify else {
886 return;
887 };
888
889 if notify_command.is_empty() {
890 return;
891 }
892
893 let Ok(json) = serde_json::to_string(¬ification) else {
894 error!("failed to serialise notification payload");
895 return;
896 };
897
898 let mut command = std::process::Command::new(¬ify_command[0]);
899 if notify_command.len() > 1 {
900 command.args(¬ify_command[1..]);
901 }
902 command.arg(json);
903
904 if let Err(e) = command.spawn() {
906 warn!("failed to spawn notifier '{}': {e}", notify_command[0]);
907 }
908 }
909}
910
911impl Drop for Session {
912 fn drop(&mut self) {
913 self.interrupt_task();
914 }
915}
916
917#[derive(Clone, Debug)]
918pub(crate) struct ExecCommandContext {
919 pub(crate) sub_id: String,
920 pub(crate) call_id: String,
921 pub(crate) command_for_display: Vec<String>,
922 pub(crate) cwd: PathBuf,
923 pub(crate) apply_patch: Option<ApplyPatchCommandContext>,
924}
925
926#[derive(Clone, Debug)]
927pub(crate) struct ApplyPatchCommandContext {
928 pub(crate) user_explicitly_approved_this_action: bool,
929 pub(crate) changes: HashMap<PathBuf, FileChange>,
930}
931
932pub(crate) struct AgentTask {
934 sess: Arc<Session>,
935 sub_id: String,
936 handle: AbortHandle,
937}
938
939impl AgentTask {
940 fn spawn(
941 sess: Arc<Session>,
942 turn_context: Arc<TurnContext>,
943 sub_id: String,
944 input: Vec<InputItem>,
945 ) -> Self {
946 let handle = {
947 let sess = sess.clone();
948 let sub_id = sub_id.clone();
949 let tc = Arc::clone(&turn_context);
950 tokio::spawn(async move { run_task(sess, tc.as_ref(), sub_id, input).await })
951 .abort_handle()
952 };
953 Self {
954 sess,
955 sub_id,
956 handle,
957 }
958 }
959
960 fn compact(
961 sess: Arc<Session>,
962 turn_context: Arc<TurnContext>,
963 sub_id: String,
964 input: Vec<InputItem>,
965 compact_instructions: String,
966 ) -> Self {
967 let handle = {
968 let sess = sess.clone();
969 let sub_id = sub_id.clone();
970 let tc = Arc::clone(&turn_context);
971 tokio::spawn(async move {
972 run_compact_task(sess, tc.as_ref(), sub_id, input, compact_instructions).await
973 })
974 .abort_handle()
975 };
976 Self {
977 sess,
978 sub_id,
979 handle,
980 }
981 }
982
983 fn abort(self, reason: TurnAbortReason) {
984 if !self.handle.is_finished() {
986 self.handle.abort();
987 let event = Event {
988 id: self.sub_id,
989 msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }),
990 };
991 let tx_event = self.sess.tx_event.clone();
992 tokio::spawn(async move {
993 tx_event.send(event).await.ok();
994 });
995 }
996 }
997}
998
999async fn submission_loop(
1000 sess: Arc<Session>,
1001 turn_context: TurnContext,
1002 config: Arc<Config>,
1003 rx_sub: Receiver<Submission>,
1004) {
1005 let mut turn_context = Arc::new(turn_context);
1007 while let Ok(sub) = rx_sub.recv().await {
1009 debug!(?sub, "Submission");
1010 match sub.op {
1011 Op::Interrupt => {
1012 sess.interrupt_task();
1013 }
1014 Op::OverrideTurnContext {
1015 cwd,
1016 approval_policy,
1017 sandbox_policy,
1018 model,
1019 effort,
1020 summary,
1021 } => {
1022 let prev = Arc::clone(&turn_context);
1024 let provider = prev.client.get_provider();
1025
1026 let (effective_model, effective_family) = if let Some(m) = model {
1028 let fam =
1029 find_family_for_model(&m).unwrap_or_else(|| config.model_family.clone());
1030 (m, fam)
1031 } else {
1032 (prev.client.get_model(), prev.client.get_model_family())
1033 };
1034
1035 let effective_effort = effort.unwrap_or(prev.client.get_reasoning_effort());
1037 let effective_summary = summary.unwrap_or(prev.client.get_reasoning_summary());
1038
1039 let auth = prev.client.get_auth();
1040 let mut updated_config = (*config).clone();
1042 updated_config.model = effective_model.clone();
1043 updated_config.model_family = effective_family.clone();
1044
1045 let client = ModelClient::new(
1046 Arc::new(updated_config),
1047 auth,
1048 provider,
1049 effective_effort,
1050 effective_summary,
1051 sess.session_id,
1052 );
1053
1054 let new_approval_policy = approval_policy.unwrap_or(prev.approval_policy);
1055 let new_sandbox_policy = sandbox_policy
1056 .clone()
1057 .unwrap_or(prev.sandbox_policy.clone());
1058 let new_cwd = cwd.clone().unwrap_or_else(|| prev.cwd.clone());
1059
1060 let tools_config = ToolsConfig::new(
1061 &effective_family,
1062 new_approval_policy,
1063 new_sandbox_policy.clone(),
1064 config.include_plan_tool,
1065 config.include_apply_patch_tool,
1066 );
1067
1068 let new_turn_context = TurnContext {
1069 client,
1070 tools_config,
1071 user_instructions: prev.user_instructions.clone(),
1072 base_instructions: prev.base_instructions.clone(),
1073 approval_policy: new_approval_policy,
1074 sandbox_policy: new_sandbox_policy.clone(),
1075 shell_environment_policy: prev.shell_environment_policy.clone(),
1076 cwd: new_cwd.clone(),
1077 disable_response_storage: prev.disable_response_storage,
1078 };
1079
1080 turn_context = Arc::new(new_turn_context);
1082 if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
1083 sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new(
1084 new_cwd,
1085 new_approval_policy,
1086 new_sandbox_policy,
1087 ))])
1088 .await;
1089 }
1090 }
1091 Op::UserInput { items } => {
1092 if let Err(items) = sess.inject_input(items) {
1094 let task =
1096 AgentTask::spawn(sess.clone(), Arc::clone(&turn_context), sub.id, items);
1097 sess.set_task(task);
1098 }
1099 }
1100 Op::UserTurn {
1101 items,
1102 cwd,
1103 approval_policy,
1104 sandbox_policy,
1105 model,
1106 effort,
1107 summary,
1108 } => {
1109 if let Err(items) = sess.inject_input(items) {
1111 let provider = turn_context.client.get_provider();
1113
1114 let model_family = find_family_for_model(&model)
1116 .unwrap_or_else(|| config.model_family.clone());
1117
1118 let mut per_turn_config = (*config).clone();
1120 per_turn_config.model = model.clone();
1121 per_turn_config.model_family = model_family.clone();
1122
1123 let client = ModelClient::new(
1126 Arc::new(per_turn_config),
1127 None,
1128 provider,
1129 effort,
1130 summary,
1131 sess.session_id,
1132 );
1133
1134 let fresh_turn_context = TurnContext {
1135 client,
1136 tools_config: ToolsConfig::new(
1137 &model_family,
1138 approval_policy,
1139 sandbox_policy.clone(),
1140 config.include_plan_tool,
1141 config.include_apply_patch_tool,
1142 ),
1143 user_instructions: turn_context.user_instructions.clone(),
1144 base_instructions: turn_context.base_instructions.clone(),
1145 approval_policy,
1146 sandbox_policy,
1147 shell_environment_policy: turn_context.shell_environment_policy.clone(),
1148 cwd,
1149 disable_response_storage: turn_context.disable_response_storage,
1150 };
1151 let task =
1154 AgentTask::spawn(sess.clone(), Arc::new(fresh_turn_context), sub.id, items);
1155 sess.set_task(task);
1156 }
1157 }
1158 Op::ExecApproval { id, decision } => match decision {
1159 ReviewDecision::Abort => {
1160 sess.interrupt_task();
1161 }
1162 other => sess.notify_approval(&id, other),
1163 },
1164 Op::PatchApproval { id, decision } => match decision {
1165 ReviewDecision::Abort => {
1166 sess.interrupt_task();
1167 }
1168 other => sess.notify_approval(&id, other),
1169 },
1170 Op::AddToHistory { text } => {
1171 let id = sess.session_id;
1172 let config = config.clone();
1173 tokio::spawn(async move {
1174 if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await
1175 {
1176 warn!("failed to append to message history: {e}");
1177 }
1178 });
1179 }
1180
1181 Op::GetHistoryEntryRequest { offset, log_id } => {
1182 let config = config.clone();
1183 let tx_event = sess.tx_event.clone();
1184 let sub_id = sub.id.clone();
1185
1186 tokio::spawn(async move {
1187 let entry_opt = tokio::task::spawn_blocking(move || {
1189 crate::message_history::lookup(log_id, offset, &config)
1190 })
1191 .await
1192 .unwrap_or(None);
1193
1194 let event = Event {
1195 id: sub_id,
1196 msg: EventMsg::GetHistoryEntryResponse(
1197 crate::protocol::GetHistoryEntryResponseEvent {
1198 offset,
1199 log_id,
1200 entry: entry_opt.map(|e| {
1201 agcodex_protocol::message_history::HistoryEntry {
1202 session_id: e.session_id,
1203 ts: e.ts,
1204 text: e.text,
1205 }
1206 }),
1207 },
1208 ),
1209 };
1210
1211 if let Err(e) = tx_event.send(event).await {
1212 warn!("failed to send GetHistoryEntryResponse event: {e}");
1213 }
1214 });
1215 }
1216 Op::ListMcpTools => {
1217 let tx_event = sess.tx_event.clone();
1218 let sub_id = sub.id.clone();
1219
1220 let tools = sess.mcp_connection_manager.list_all_tools();
1222 let event = Event {
1223 id: sub_id,
1224 msg: EventMsg::McpListToolsResponse(
1225 crate::protocol::McpListToolsResponseEvent { tools },
1226 ),
1227 };
1228 if let Err(e) = tx_event.send(event).await {
1229 warn!("failed to send McpListToolsResponse event: {e}");
1230 }
1231 }
1232 Op::Compact => {
1233 const SUMMARIZATION_PROMPT: &str = include_str!("prompt_for_compact_command.md");
1235
1236 if let Err(items) = sess.inject_input(vec![InputItem::Text {
1238 text: "Start Summarization".to_string(),
1239 }]) {
1240 let task = AgentTask::compact(
1241 sess.clone(),
1242 Arc::clone(&turn_context),
1243 sub.id,
1244 items,
1245 SUMMARIZATION_PROMPT.to_string(),
1246 );
1247 sess.set_task(task);
1248 }
1249 }
1250 Op::Shutdown => {
1251 info!("Shutting down Codex instance");
1252
1253 let recorder_opt = sess.rollout.lock_unchecked().take();
1256 if let Some(rec) = recorder_opt
1257 && let Err(e) = rec.shutdown().await
1258 {
1259 warn!("failed to shutdown rollout recorder: {e}");
1260 let event = Event {
1261 id: sub.id.clone(),
1262 msg: EventMsg::Error(ErrorEvent {
1263 message: "Failed to shutdown rollout recorder".to_string(),
1264 }),
1265 };
1266 if let Err(e) = sess.tx_event.send(event).await {
1267 warn!("failed to send error message: {e:?}");
1268 }
1269 }
1270
1271 let event = Event {
1272 id: sub.id.clone(),
1273 msg: EventMsg::ShutdownComplete,
1274 };
1275 if let Err(e) = sess.tx_event.send(event).await {
1276 warn!("failed to send Shutdown event: {e}");
1277 }
1278 break;
1279 }
1280 _ => {
1281 }
1283 }
1284 }
1285 debug!("Agent loop exited");
1286}
1287
1288async fn run_task(
1302 sess: Arc<Session>,
1303 turn_context: &TurnContext,
1304 sub_id: String,
1305 input: Vec<InputItem>,
1306) {
1307 if input.is_empty() {
1308 return;
1309 }
1310 let event = Event {
1311 id: sub_id.clone(),
1312 msg: EventMsg::TaskStarted,
1313 };
1314 if sess.tx_event.send(event).await.is_err() {
1315 return;
1316 }
1317
1318 let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
1319 sess.record_conversation_items(&[initial_input_for_turn.clone().into()])
1320 .await;
1321
1322 let mut last_agent_message: Option<String> = None;
1323 let mut turn_diff_tracker = TurnDiffTracker::new();
1326
1327 loop {
1328 let pending_input = sess
1332 .get_pending_input()
1333 .into_iter()
1334 .map(ResponseItem::from)
1335 .collect::<Vec<ResponseItem>>();
1336 sess.record_conversation_items(&pending_input).await;
1337
1338 let turn_input: Vec<ResponseItem> = sess.turn_input_with_history(pending_input);
1344
1345 let turn_input_messages: Vec<String> = turn_input
1346 .iter()
1347 .filter_map(|item| match item {
1348 ResponseItem::Message { content, .. } => Some(content),
1349 _ => None,
1350 })
1351 .flat_map(|content| {
1352 content.iter().filter_map(|item| match item {
1353 ContentItem::OutputText { text } => Some(text.clone()),
1354 _ => None,
1355 })
1356 })
1357 .collect();
1358 match run_turn(
1359 &sess,
1360 turn_context,
1361 &mut turn_diff_tracker,
1362 sub_id.clone(),
1363 turn_input,
1364 )
1365 .await
1366 {
1367 Ok(turn_output) => {
1368 let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
1369 let mut responses = Vec::<ResponseInputItem>::new();
1370 for processed_response_item in turn_output {
1371 let ProcessedResponseItem { item, response } = processed_response_item;
1372 match (&item, &response) {
1373 (ResponseItem::Message { role, .. }, None) if role == "assistant" => {
1374 items_to_record_in_conversation_history.push(item);
1376 }
1377 (
1378 ResponseItem::LocalShellCall { .. },
1379 Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
1380 ) => {
1381 items_to_record_in_conversation_history.push(item);
1382 items_to_record_in_conversation_history.push(
1383 ResponseItem::FunctionCallOutput {
1384 call_id: call_id.clone(),
1385 output: output.clone(),
1386 },
1387 );
1388 }
1389 (
1390 ResponseItem::FunctionCall { .. },
1391 Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
1392 ) => {
1393 items_to_record_in_conversation_history.push(item);
1394 items_to_record_in_conversation_history.push(
1395 ResponseItem::FunctionCallOutput {
1396 call_id: call_id.clone(),
1397 output: output.clone(),
1398 },
1399 );
1400 }
1401 (
1402 ResponseItem::FunctionCall { .. },
1403 Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
1404 ) => {
1405 items_to_record_in_conversation_history.push(item);
1406 let (content, success): (String, Option<bool>) = match result {
1407 Ok(CallToolResult {
1408 content,
1409 is_error,
1410 structured_content: _,
1411 }) => match serde_json::to_string(content) {
1412 Ok(content) => (content, *is_error),
1413 Err(e) => {
1414 warn!("Failed to serialize MCP tool call output: {e}");
1415 (e.to_string(), Some(true))
1416 }
1417 },
1418 Err(e) => (e.clone(), Some(true)),
1419 };
1420 items_to_record_in_conversation_history.push(
1421 ResponseItem::FunctionCallOutput {
1422 call_id: call_id.clone(),
1423 output: FunctionCallOutputPayload { content, success },
1424 },
1425 );
1426 }
1427 (
1428 ResponseItem::Reasoning {
1429 id,
1430 summary,
1431 content,
1432 encrypted_content,
1433 },
1434 None,
1435 ) => {
1436 items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
1437 id: id.clone(),
1438 summary: summary.clone(),
1439 content: content.clone(),
1440 encrypted_content: encrypted_content.clone(),
1441 });
1442 }
1443 _ => {
1444 warn!("Unexpected response item: {item:?} with response: {response:?}");
1445 }
1446 };
1447 if let Some(response) = response {
1448 responses.push(response);
1449 }
1450 }
1451
1452 if !items_to_record_in_conversation_history.is_empty() {
1454 sess.record_conversation_items(&items_to_record_in_conversation_history)
1455 .await;
1456 }
1457
1458 if responses.is_empty() {
1459 debug!("Turn completed");
1460 last_agent_message = get_last_assistant_message_from_turn(
1461 &items_to_record_in_conversation_history,
1462 );
1463 sess.maybe_notify(UserNotification::AgentTurnComplete {
1464 turn_id: sub_id.clone(),
1465 input_messages: turn_input_messages,
1466 last_assistant_message: last_agent_message.clone(),
1467 });
1468 break;
1469 }
1470 }
1471 Err(e) => {
1472 info!("Turn error: {e:#}");
1473 let event = Event {
1474 id: sub_id.clone(),
1475 msg: EventMsg::Error(ErrorEvent {
1476 message: e.to_string(),
1477 }),
1478 };
1479 sess.tx_event.send(event).await.ok();
1480 break;
1482 }
1483 }
1484 }
1485 sess.remove_task(&sub_id);
1486 let event = Event {
1487 id: sub_id,
1488 msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }),
1489 };
1490 sess.tx_event.send(event).await.ok();
1491}
1492
1493async fn run_turn(
1494 sess: &Session,
1495 turn_context: &TurnContext,
1496 turn_diff_tracker: &mut TurnDiffTracker,
1497 sub_id: String,
1498 input: Vec<ResponseItem>,
1499) -> CodexResult<Vec<ProcessedResponseItem>> {
1500 let tools = get_openai_tools(
1501 &turn_context.tools_config,
1502 Some(sess.mcp_connection_manager.list_all_tools()),
1503 );
1504
1505 let prompt = Prompt {
1506 input,
1507 store: !turn_context.disable_response_storage,
1508 tools,
1509 base_instructions_override: turn_context.base_instructions.clone(),
1510 };
1511
1512 let mut retries = 0;
1513 loop {
1514 match try_run_turn(sess, turn_context, turn_diff_tracker, &sub_id, &prompt).await {
1515 Ok(output) => return Ok(output),
1516 Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
1517 Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
1518 Err(e @ (CodexErr::UsageLimitReached(_) | CodexErr::UsageNotIncluded)) => {
1519 return Err(e);
1520 }
1521 Err(e) => {
1522 let max_retries = turn_context.client.get_provider().stream_max_retries();
1524 if retries < max_retries {
1525 retries += 1;
1526 let delay = match e {
1527 CodexErr::Stream(_, Some(delay)) => delay,
1528 _ => backoff(retries),
1529 };
1530 warn!(
1531 "stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
1532 );
1533
1534 sess.notify_background_event(
1538 &sub_id,
1539 format!(
1540 "stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…"
1541 ),
1542 )
1543 .await;
1544
1545 tokio::time::sleep(delay).await;
1546 } else {
1547 return Err(e);
1548 }
1549 }
1550 }
1551 }
1552}
1553
1554#[derive(Debug)]
1559struct ProcessedResponseItem {
1560 item: ResponseItem,
1561 response: Option<ResponseInputItem>,
1562}
1563
1564async fn try_run_turn(
1565 sess: &Session,
1566 turn_context: &TurnContext,
1567 turn_diff_tracker: &mut TurnDiffTracker,
1568 sub_id: &str,
1569 prompt: &Prompt,
1570) -> CodexResult<Vec<ProcessedResponseItem>> {
1571 let completed_call_ids = prompt
1573 .input
1574 .iter()
1575 .filter_map(|ri| match ri {
1576 ResponseItem::FunctionCallOutput { call_id, .. } => Some(call_id),
1577 ResponseItem::LocalShellCall {
1578 call_id: Some(call_id),
1579 ..
1580 } => Some(call_id),
1581 _ => None,
1582 })
1583 .collect::<Vec<_>>();
1584
1585 let missing_calls = {
1589 prompt
1590 .input
1591 .iter()
1592 .filter_map(|ri| match ri {
1593 ResponseItem::FunctionCall { call_id, .. } => Some(call_id),
1594 ResponseItem::LocalShellCall {
1595 call_id: Some(call_id),
1596 ..
1597 } => Some(call_id),
1598 _ => None,
1599 })
1600 .filter_map(|call_id| {
1601 if completed_call_ids.contains(&call_id) {
1602 None
1603 } else {
1604 Some(call_id.clone())
1605 }
1606 })
1607 .map(|call_id| ResponseItem::FunctionCallOutput {
1608 call_id: call_id.clone(),
1609 output: FunctionCallOutputPayload {
1610 content: "aborted".to_string(),
1611 success: Some(false),
1612 },
1613 })
1614 .collect::<Vec<_>>()
1615 };
1616 let prompt: Cow<Prompt> = if missing_calls.is_empty() {
1617 Cow::Borrowed(prompt)
1618 } else {
1619 let input = [missing_calls, prompt.input.clone()].concat();
1621 Cow::Owned(Prompt {
1622 input,
1623 ..prompt.clone()
1624 })
1625 };
1626
1627 let mut stream = turn_context.client.clone().stream(&prompt).await?;
1628
1629 let mut output = Vec::new();
1630 loop {
1631 let event = stream.next().await;
1635 let Some(event) = event else {
1636 return Err(CodexErr::Stream(
1639 "stream closed before response.completed".into(),
1640 None,
1641 ));
1642 };
1643
1644 let event = match event {
1645 Ok(ev) => ev,
1646 Err(e) => {
1647 return Err(e);
1650 }
1651 };
1652
1653 match event {
1654 ResponseEvent::Created => {}
1655 ResponseEvent::OutputItemDone(item) => {
1656 let response = handle_response_item(
1657 sess,
1658 turn_context,
1659 turn_diff_tracker,
1660 sub_id,
1661 item.clone(),
1662 )
1663 .await?;
1664 output.push(ProcessedResponseItem { item, response });
1665 }
1666 ResponseEvent::Completed {
1667 response_id: _,
1668 token_usage,
1669 } => {
1670 if let Some(token_usage) = token_usage {
1671 sess.tx_event
1672 .send(Event {
1673 id: sub_id.to_string(),
1674 msg: EventMsg::TokenCount(token_usage),
1675 })
1676 .await
1677 .ok();
1678 }
1679
1680 let unified_diff = turn_diff_tracker.get_unified_diff();
1681 if let Ok(Some(unified_diff)) = unified_diff {
1682 let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
1683 let event = Event {
1684 id: sub_id.to_string(),
1685 msg,
1686 };
1687 let _ = sess.tx_event.send(event).await;
1688 }
1689
1690 return Ok(output);
1691 }
1692 ResponseEvent::OutputTextDelta(delta) => {
1693 {
1694 let mut st = sess.state.lock_unchecked();
1695 st.history.append_assistant_text(&delta);
1696 }
1697
1698 let event = Event {
1699 id: sub_id.to_string(),
1700 msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
1701 };
1702 sess.tx_event.send(event).await.ok();
1703 }
1704 ResponseEvent::ReasoningSummaryDelta(delta) => {
1705 let event = Event {
1706 id: sub_id.to_string(),
1707 msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
1708 };
1709 sess.tx_event.send(event).await.ok();
1710 }
1711 ResponseEvent::ReasoningSummaryPartAdded => {
1712 let event = Event {
1713 id: sub_id.to_string(),
1714 msg: EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {}),
1715 };
1716 sess.tx_event.send(event).await.ok();
1717 }
1718 ResponseEvent::ReasoningContentDelta(delta) => {
1719 if sess.show_raw_agent_reasoning {
1720 let event = Event {
1721 id: sub_id.to_string(),
1722 msg: EventMsg::AgentReasoningRawContentDelta(
1723 AgentReasoningRawContentDeltaEvent { delta },
1724 ),
1725 };
1726 sess.tx_event.send(event).await.ok();
1727 }
1728 }
1729 }
1730 }
1731}
1732
1733async fn run_compact_task(
1734 sess: Arc<Session>,
1735 turn_context: &TurnContext,
1736 sub_id: String,
1737 input: Vec<InputItem>,
1738 compact_instructions: String,
1739) {
1740 let start_event = Event {
1741 id: sub_id.clone(),
1742 msg: EventMsg::TaskStarted,
1743 };
1744 if sess.tx_event.send(start_event).await.is_err() {
1745 return;
1746 }
1747
1748 let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
1749 let turn_input: Vec<ResponseItem> =
1750 sess.turn_input_with_history(vec![initial_input_for_turn.clone().into()]);
1751
1752 let prompt = Prompt {
1753 input: turn_input,
1754 store: !turn_context.disable_response_storage,
1755 tools: Vec::new(),
1756 base_instructions_override: Some(compact_instructions.clone()),
1757 };
1758
1759 let max_retries = turn_context.client.get_provider().stream_max_retries();
1760 let mut retries = 0;
1761
1762 loop {
1763 let attempt_result = drain_to_completed(&sess, turn_context, &sub_id, &prompt).await;
1764
1765 match attempt_result {
1766 Ok(()) => break,
1767 Err(CodexErr::Interrupted) => return,
1768 Err(e) => {
1769 if retries < max_retries {
1770 retries += 1;
1771 let delay = backoff(retries);
1772 sess.notify_background_event(
1773 &sub_id,
1774 format!(
1775 "stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…"
1776 ),
1777 )
1778 .await;
1779 tokio::time::sleep(delay).await;
1780 continue;
1781 } else {
1782 let event = Event {
1783 id: sub_id.clone(),
1784 msg: EventMsg::Error(ErrorEvent {
1785 message: e.to_string(),
1786 }),
1787 };
1788 sess.send_event(event).await;
1789 return;
1790 }
1791 }
1792 }
1793 }
1794
1795 sess.remove_task(&sub_id);
1796 let event = Event {
1797 id: sub_id.clone(),
1798 msg: EventMsg::AgentMessage(AgentMessageEvent {
1799 message: "Compact task completed".to_string(),
1800 }),
1801 };
1802 sess.send_event(event).await;
1803 let event = Event {
1804 id: sub_id.clone(),
1805 msg: EventMsg::TaskComplete(TaskCompleteEvent {
1806 last_agent_message: None,
1807 }),
1808 };
1809 sess.send_event(event).await;
1810
1811 let mut state = sess.state.lock_unchecked();
1812 state.history.keep_last_messages(1);
1813}
1814
1815async fn handle_response_item(
1816 sess: &Session,
1817 turn_context: &TurnContext,
1818 turn_diff_tracker: &mut TurnDiffTracker,
1819 sub_id: &str,
1820 item: ResponseItem,
1821) -> CodexResult<Option<ResponseInputItem>> {
1822 debug!(?item, "Output item");
1823 let output = match item {
1824 ResponseItem::Message { content, .. } => {
1825 for item in content {
1826 if let ContentItem::OutputText { text } = item {
1827 let event = Event {
1828 id: sub_id.to_string(),
1829 msg: EventMsg::AgentMessage(AgentMessageEvent { message: text }),
1830 };
1831 sess.tx_event.send(event).await.ok();
1832 }
1833 }
1834 None
1835 }
1836 ResponseItem::Reasoning {
1837 id: _,
1838 summary,
1839 content,
1840 encrypted_content: _,
1841 } => {
1842 for item in summary {
1843 let text = match item {
1844 ReasoningItemReasoningSummary::SummaryText { text } => text,
1845 };
1846 let event = Event {
1847 id: sub_id.to_string(),
1848 msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }),
1849 };
1850 sess.tx_event.send(event).await.ok();
1851 }
1852 if sess.show_raw_agent_reasoning
1853 && let Some(content) = content
1854 {
1855 for item in content {
1856 let text = match item {
1857 ReasoningItemContent::ReasoningText { text } => text,
1858 ReasoningItemContent::Text { text } => text,
1859 };
1860 let event = Event {
1861 id: sub_id.to_string(),
1862 msg: EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
1863 text,
1864 }),
1865 };
1866 sess.tx_event.send(event).await.ok();
1867 }
1868 }
1869 None
1870 }
1871 ResponseItem::FunctionCall {
1872 name,
1873 arguments,
1874 call_id,
1875 ..
1876 } => {
1877 info!("FunctionCall: {arguments}");
1878 Some(
1879 handle_function_call(
1880 sess,
1881 turn_context,
1882 turn_diff_tracker,
1883 sub_id.to_string(),
1884 name,
1885 arguments,
1886 call_id,
1887 )
1888 .await,
1889 )
1890 }
1891 ResponseItem::LocalShellCall {
1892 id,
1893 call_id,
1894 status: _,
1895 action,
1896 } => {
1897 let LocalShellAction::Exec(action) = action;
1898 tracing::info!("LocalShellCall: {action:?}");
1899 let params = ShellToolCallParams {
1900 command: action.command,
1901 workdir: action.working_directory,
1902 timeout_ms: action.timeout_ms,
1903 with_escalated_permissions: None,
1904 justification: None,
1905 };
1906 let effective_call_id = match (call_id, id) {
1907 (Some(call_id), _) => call_id,
1908 (None, Some(id)) => id,
1909 (None, None) => {
1910 error!("LocalShellCall without call_id or id");
1911 return Ok(Some(ResponseInputItem::FunctionCallOutput {
1912 call_id: "".to_string(),
1913 output: FunctionCallOutputPayload {
1914 content: "LocalShellCall without call_id or id".to_string(),
1915 success: None,
1916 },
1917 }));
1918 }
1919 };
1920
1921 let exec_params = to_exec_params(params, turn_context);
1922 Some(
1923 handle_container_exec_with_params(
1924 exec_params,
1925 sess,
1926 turn_context,
1927 turn_diff_tracker,
1928 sub_id.to_string(),
1929 effective_call_id,
1930 )
1931 .await,
1932 )
1933 }
1934 ResponseItem::FunctionCallOutput { .. } => {
1935 debug!("unexpected FunctionCallOutput from stream");
1936 None
1937 }
1938 ResponseItem::Other => None,
1939 };
1940 Ok(output)
1941}
1942
1943async fn handle_function_call(
1944 sess: &Session,
1945 turn_context: &TurnContext,
1946 turn_diff_tracker: &mut TurnDiffTracker,
1947 sub_id: String,
1948 name: String,
1949 arguments: String,
1950 call_id: String,
1951) -> ResponseInputItem {
1952 match name.as_str() {
1953 "container.exec" | "shell" => {
1954 let params = match parse_container_exec_arguments(arguments, turn_context, &call_id) {
1955 Ok(params) => params,
1956 Err(output) => {
1957 return *output;
1958 }
1959 };
1960 handle_container_exec_with_params(
1961 params,
1962 sess,
1963 turn_context,
1964 turn_diff_tracker,
1965 sub_id,
1966 call_id,
1967 )
1968 .await
1969 }
1970 "apply_patch" => {
1971 let args = match serde_json::from_str::<ApplyPatchToolArgs>(&arguments) {
1972 Ok(a) => a,
1973 Err(e) => {
1974 return ResponseInputItem::FunctionCallOutput {
1975 call_id,
1976 output: FunctionCallOutputPayload {
1977 content: format!("failed to parse function arguments: {e}"),
1978 success: None,
1979 },
1980 };
1981 }
1982 };
1983 let exec_params = ExecParams {
1984 command: vec!["apply_patch".to_string(), args.input.clone()],
1985 cwd: turn_context.cwd.clone(),
1986 timeout_ms: None,
1987 env: HashMap::new(),
1988 with_escalated_permissions: None,
1989 justification: None,
1990 };
1991 handle_container_exec_with_params(
1992 exec_params,
1993 sess,
1994 turn_context,
1995 turn_diff_tracker,
1996 sub_id,
1997 call_id,
1998 )
1999 .await
2000 }
2001 "update_plan" => handle_update_plan(sess, arguments, sub_id, call_id).await,
2002 _ => {
2003 match sess.mcp_connection_manager.parse_tool_name(&name) {
2004 Some((server, tool_name)) => {
2005 let timeout = None;
2007 handle_mcp_tool_call(
2008 sess, &sub_id, call_id, server, tool_name, arguments, timeout,
2009 )
2010 .await
2011 }
2012 None => {
2013 ResponseInputItem::FunctionCallOutput {
2015 call_id,
2016 output: FunctionCallOutputPayload {
2017 content: format!("unsupported call: {name}"),
2018 success: None,
2019 },
2020 }
2021 }
2022 }
2023 }
2024 }
2025}
2026
2027fn to_exec_params(params: ShellToolCallParams, turn_context: &TurnContext) -> ExecParams {
2028 ExecParams {
2029 command: params.command,
2030 cwd: turn_context.resolve_path(params.workdir.clone()),
2031 timeout_ms: params.timeout_ms,
2032 env: create_env(&turn_context.shell_environment_policy),
2033 with_escalated_permissions: params.with_escalated_permissions,
2034 justification: params.justification,
2035 }
2036}
2037
2038fn parse_container_exec_arguments(
2039 arguments: String,
2040 turn_context: &TurnContext,
2041 call_id: &str,
2042) -> std::result::Result<ExecParams, Box<ResponseInputItem>> {
2043 match serde_json::from_str::<ShellToolCallParams>(&arguments) {
2045 Ok(shell_tool_call_params) => Ok(to_exec_params(shell_tool_call_params, turn_context)),
2046 Err(e) => {
2047 let output = ResponseInputItem::FunctionCallOutput {
2049 call_id: call_id.to_string(),
2050 output: FunctionCallOutputPayload {
2051 content: format!("failed to parse function arguments: {e}"),
2052 success: None,
2053 },
2054 };
2055 Err(Box::new(output))
2056 }
2057 }
2058}
2059
2060pub struct ExecInvokeArgs<'a> {
2061 pub params: ExecParams,
2062 pub sandbox_type: SandboxType,
2063 pub sandbox_policy: &'a SandboxPolicy,
2064 pub codex_linux_sandbox_exe: &'a Option<PathBuf>,
2065 pub stdout_stream: Option<StdoutStream>,
2066}
2067
2068fn maybe_run_with_user_profile(
2069 params: ExecParams,
2070 sess: &Session,
2071 turn_context: &TurnContext,
2072) -> ExecParams {
2073 if turn_context.shell_environment_policy.use_profile {
2074 let command = sess
2075 .user_shell
2076 .format_default_shell_invocation(params.command.clone());
2077 if let Some(command) = command {
2078 return ExecParams { command, ..params };
2079 }
2080 }
2081 params
2082}
2083
2084async fn handle_container_exec_with_params(
2085 params: ExecParams,
2086 sess: &Session,
2087 turn_context: &TurnContext,
2088 turn_diff_tracker: &mut TurnDiffTracker,
2089 sub_id: String,
2090 call_id: String,
2091) -> ResponseInputItem {
2092 let apply_patch_exec = match maybe_parse_apply_patch_verified(¶ms.command, ¶ms.cwd) {
2094 MaybeApplyPatchVerified::Body(changes) => {
2095 match apply_patch::apply_patch(sess, turn_context, &sub_id, &call_id, changes).await {
2096 InternalApplyPatchInvocation::Output(item) => return item,
2097 InternalApplyPatchInvocation::DelegateToExec(apply_patch_exec) => {
2098 Some(apply_patch_exec)
2099 }
2100 }
2101 }
2102 MaybeApplyPatchVerified::CorrectnessError(parse_error) => {
2103 return ResponseInputItem::FunctionCallOutput {
2107 call_id,
2108 output: FunctionCallOutputPayload {
2109 content: format!("error: {parse_error:#}"),
2110 success: None,
2111 },
2112 };
2113 }
2114 MaybeApplyPatchVerified::ShellParseError(error) => {
2115 trace!("Failed to parse shell command, {error:?}");
2116 None
2117 }
2118 MaybeApplyPatchVerified::NotApplyPatch => None,
2119 };
2120
2121 let (params, safety, command_for_display) = match &apply_patch_exec {
2122 Some(ApplyPatchExec {
2123 action: ApplyPatchAction { patch, cwd, .. },
2124 user_explicitly_approved_this_action,
2125 }) => {
2126 let path_to_codex = std::env::current_exe()
2127 .ok()
2128 .map(|p| p.to_string_lossy().to_string());
2129 let Some(path_to_codex) = path_to_codex else {
2130 return ResponseInputItem::FunctionCallOutput {
2131 call_id,
2132 output: FunctionCallOutputPayload {
2133 content: "failed to determine path to codex executable".to_string(),
2134 success: None,
2135 },
2136 };
2137 };
2138
2139 let params = ExecParams {
2140 command: vec![
2141 path_to_codex,
2142 CODEX_APPLY_PATCH_ARG1.to_string(),
2143 patch.clone(),
2144 ],
2145 cwd: cwd.clone(),
2146 timeout_ms: params.timeout_ms,
2147 env: HashMap::new(),
2148 with_escalated_permissions: params.with_escalated_permissions,
2149 justification: params.justification.clone(),
2150 };
2151 let safety = if *user_explicitly_approved_this_action {
2152 SafetyCheck::AutoApprove {
2153 sandbox_type: SandboxType::None,
2154 }
2155 } else {
2156 assess_safety_for_untrusted_command(
2157 turn_context.approval_policy,
2158 &turn_context.sandbox_policy,
2159 params.with_escalated_permissions.unwrap_or(false),
2160 )
2161 };
2162 (
2163 params,
2164 safety,
2165 vec!["apply_patch".to_string(), patch.clone()],
2166 )
2167 }
2168 None => {
2169 let safety = {
2170 let state = sess.state.lock_unchecked();
2171 assess_command_safety(
2172 ¶ms.command,
2173 turn_context.approval_policy,
2174 &turn_context.sandbox_policy,
2175 &state.approved_commands,
2176 params.with_escalated_permissions.unwrap_or(false),
2177 )
2178 };
2179 let command_for_display = params.command.clone();
2180 (params, safety, command_for_display)
2181 }
2182 };
2183
2184 let sandbox_type = match safety {
2185 SafetyCheck::AutoApprove { sandbox_type } => sandbox_type,
2186 SafetyCheck::AskUser => {
2187 let rx_approve = sess
2188 .request_command_approval(
2189 sub_id.clone(),
2190 call_id.clone(),
2191 params.command.clone(),
2192 params.cwd.clone(),
2193 params.justification.clone(),
2194 )
2195 .await;
2196 match rx_approve.await.unwrap_or_default() {
2197 ReviewDecision::Approved => (),
2198 ReviewDecision::ApprovedForSession => {
2199 sess.add_approved_command(params.command.clone());
2200 }
2201 ReviewDecision::Denied | ReviewDecision::Abort => {
2202 return ResponseInputItem::FunctionCallOutput {
2203 call_id,
2204 output: FunctionCallOutputPayload {
2205 content: "exec command rejected by user".to_string(),
2206 success: None,
2207 },
2208 };
2209 }
2210 }
2211 SandboxType::None
2216 }
2217 SafetyCheck::Reject { reason } => {
2218 return ResponseInputItem::FunctionCallOutput {
2219 call_id,
2220 output: FunctionCallOutputPayload {
2221 content: format!("exec command rejected: {reason}"),
2222 success: None,
2223 },
2224 };
2225 }
2226 };
2227
2228 let exec_command_context = ExecCommandContext {
2229 sub_id: sub_id.clone(),
2230 call_id: call_id.clone(),
2231 command_for_display: command_for_display.clone(),
2232 cwd: params.cwd.clone(),
2233 apply_patch: apply_patch_exec.map(
2234 |ApplyPatchExec {
2235 action,
2236 user_explicitly_approved_this_action,
2237 }| ApplyPatchCommandContext {
2238 user_explicitly_approved_this_action,
2239 changes: convert_apply_patch_to_protocol(&action),
2240 },
2241 ),
2242 };
2243
2244 let params = maybe_run_with_user_profile(params, sess, turn_context);
2245 let output_result = sess
2246 .run_exec_with_events(
2247 turn_diff_tracker,
2248 exec_command_context.clone(),
2249 ExecInvokeArgs {
2250 params: params.clone(),
2251 sandbox_type,
2252 sandbox_policy: &turn_context.sandbox_policy,
2253 codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
2254 stdout_stream: Some(StdoutStream {
2255 sub_id: sub_id.clone(),
2256 call_id: call_id.clone(),
2257 tx_event: sess.tx_event.clone(),
2258 }),
2259 },
2260 )
2261 .await;
2262
2263 match output_result {
2264 Ok(output) => {
2265 let ExecToolCallOutput { exit_code, .. } = &output;
2266
2267 let is_success = *exit_code == 0;
2268 let content = format_exec_output(output);
2269 ResponseInputItem::FunctionCallOutput {
2270 call_id: call_id.clone(),
2271 output: FunctionCallOutputPayload {
2272 content,
2273 success: Some(is_success),
2274 },
2275 }
2276 }
2277 Err(CodexErr::Sandbox(error)) => {
2278 handle_sandbox_error(
2279 turn_diff_tracker,
2280 params,
2281 exec_command_context,
2282 error,
2283 sandbox_type,
2284 sess,
2285 turn_context,
2286 )
2287 .await
2288 }
2289 Err(e) => ResponseInputItem::FunctionCallOutput {
2290 call_id: call_id.clone(),
2291 output: FunctionCallOutputPayload {
2292 content: format!("execution error: {e}"),
2293 success: None,
2294 },
2295 },
2296 }
2297}
2298
2299async fn handle_sandbox_error(
2300 turn_diff_tracker: &mut TurnDiffTracker,
2301 params: ExecParams,
2302 exec_command_context: ExecCommandContext,
2303 error: SandboxErr,
2304 sandbox_type: SandboxType,
2305 sess: &Session,
2306 turn_context: &TurnContext,
2307) -> ResponseInputItem {
2308 let call_id = exec_command_context.call_id.clone();
2309 let sub_id = exec_command_context.sub_id.clone();
2310 let cwd = exec_command_context.cwd.clone();
2311
2312 match turn_context.approval_policy {
2315 AskForApproval::Never | AskForApproval::OnRequest => {
2316 return ResponseInputItem::FunctionCallOutput {
2317 call_id,
2318 output: FunctionCallOutputPayload {
2319 content: format!(
2320 "failed in sandbox {sandbox_type:?} with execution error: {error}"
2321 ),
2322 success: Some(false),
2323 },
2324 };
2325 }
2326 AskForApproval::UnlessTrusted | AskForApproval::OnFailure => (),
2327 }
2328
2329 if matches!(error, SandboxErr::Timeout) {
2331 return ResponseInputItem::FunctionCallOutput {
2332 call_id,
2333 output: FunctionCallOutputPayload {
2334 content: format!(
2335 "command timed out after {} milliseconds",
2336 params.timeout_duration().as_millis()
2337 ),
2338 success: Some(false),
2339 },
2340 };
2341 }
2342
2343 sess.notify_background_event(&sub_id, format!("Execution failed: {error}"))
2353 .await;
2354
2355 let rx_approve = sess
2356 .request_command_approval(
2357 sub_id.clone(),
2358 call_id.clone(),
2359 params.command.clone(),
2360 cwd.clone(),
2361 Some("command failed; retry without sandbox?".to_string()),
2362 )
2363 .await;
2364
2365 match rx_approve.await.unwrap_or_default() {
2366 ReviewDecision::Approved | ReviewDecision::ApprovedForSession => {
2367 sess.add_approved_command(params.command.clone());
2372 sess.notify_background_event(&sub_id, "retrying command without sandbox")
2374 .await;
2375
2376 let retry_output_result = sess
2379 .run_exec_with_events(
2380 turn_diff_tracker,
2381 exec_command_context.clone(),
2382 ExecInvokeArgs {
2383 params,
2384 sandbox_type: SandboxType::None,
2385 sandbox_policy: &turn_context.sandbox_policy,
2386 codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
2387 stdout_stream: Some(StdoutStream {
2388 sub_id: sub_id.clone(),
2389 call_id: call_id.clone(),
2390 tx_event: sess.tx_event.clone(),
2391 }),
2392 },
2393 )
2394 .await;
2395
2396 match retry_output_result {
2397 Ok(retry_output) => {
2398 let ExecToolCallOutput { exit_code, .. } = &retry_output;
2399
2400 let is_success = *exit_code == 0;
2401 let content = format_exec_output(retry_output);
2402
2403 ResponseInputItem::FunctionCallOutput {
2404 call_id: call_id.clone(),
2405 output: FunctionCallOutputPayload {
2406 content,
2407 success: Some(is_success),
2408 },
2409 }
2410 }
2411 Err(e) => ResponseInputItem::FunctionCallOutput {
2412 call_id: call_id.clone(),
2413 output: FunctionCallOutputPayload {
2414 content: format!("retry failed: {e}"),
2415 success: None,
2416 },
2417 },
2418 }
2419 }
2420 ReviewDecision::Denied | ReviewDecision::Abort => {
2421 ResponseInputItem::FunctionCallOutput {
2423 call_id,
2424 output: FunctionCallOutputPayload {
2425 content: "exec command rejected by user".to_string(),
2426 success: None,
2427 },
2428 }
2429 }
2430 }
2431}
2432
2433fn format_exec_output(exec_output: ExecToolCallOutput) -> String {
2435 let ExecToolCallOutput {
2436 exit_code,
2437 stdout,
2438 stderr,
2439 duration,
2440 } = exec_output;
2441
2442 #[derive(Serialize)]
2443 struct ExecMetadata {
2444 exit_code: i32,
2445 duration_seconds: f32,
2446 }
2447
2448 #[derive(Serialize)]
2449 struct ExecOutput<'a> {
2450 output: &'a str,
2451 metadata: ExecMetadata,
2452 }
2453
2454 let duration_seconds = ((duration.as_secs_f32()) * 10.0).round() / 10.0;
2456
2457 let is_success = exit_code == 0;
2458 let output = if is_success { stdout } else { stderr };
2459
2460 let mut formatted_output = output.text;
2461 if let Some(truncated_after_lines) = output.truncated_after_lines {
2462 formatted_output.push_str(&format!(
2463 "\n\n[Output truncated after {truncated_after_lines} lines: too many lines or bytes.]",
2464 ));
2465 }
2466
2467 let payload = ExecOutput {
2468 output: &formatted_output,
2469 metadata: ExecMetadata {
2470 exit_code,
2471 duration_seconds,
2472 },
2473 };
2474
2475 #[expect(clippy::expect_used)]
2476 serde_json::to_string(&payload).expect("serialize ExecOutput")
2477}
2478
2479fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> {
2480 responses.iter().rev().find_map(|item| {
2481 if let ResponseItem::Message { role, content, .. } = item {
2482 if role == "assistant" {
2483 content.iter().rev().find_map(|ci| {
2484 if let ContentItem::OutputText { text } = ci {
2485 Some(text.clone())
2486 } else {
2487 None
2488 }
2489 })
2490 } else {
2491 None
2492 }
2493 } else {
2494 None
2495 }
2496 })
2497}
2498
2499async fn drain_to_completed(
2500 sess: &Session,
2501 turn_context: &TurnContext,
2502 sub_id: &str,
2503 prompt: &Prompt,
2504) -> CodexResult<()> {
2505 let mut stream = turn_context.client.clone().stream(prompt).await?;
2506 loop {
2507 let maybe_event = stream.next().await;
2508 let Some(event) = maybe_event else {
2509 return Err(CodexErr::Stream(
2510 "stream closed before response.completed".into(),
2511 None,
2512 ));
2513 };
2514 match event {
2515 Ok(ResponseEvent::OutputItemDone(item)) => {
2516 let mut state = sess.state.lock_unchecked();
2518 state.history.record_items(std::slice::from_ref(&item));
2519 }
2520 Ok(ResponseEvent::Completed {
2521 response_id: _,
2522 token_usage,
2523 }) => {
2524 let token_usage = match token_usage {
2525 Some(usage) => usage,
2526 None => {
2527 return Err(CodexErr::Stream(
2528 "token_usage was None in ResponseEvent::Completed".into(),
2529 None,
2530 ));
2531 }
2532 };
2533 sess.tx_event
2534 .send(Event {
2535 id: sub_id.to_string(),
2536 msg: EventMsg::TokenCount(token_usage),
2537 })
2538 .await
2539 .ok();
2540 return Ok(());
2541 }
2542 Ok(_) => continue,
2543 Err(e) => return Err(e),
2544 }
2545 }
2546}