agcodex_core/
codex.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::collections::HashSet;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::sync::Mutex;
7use std::sync::MutexGuard;
8use std::sync::atomic::AtomicU64;
9use std::time::Duration;
10
11use agcodex_apply_patch::ApplyPatchAction;
12use agcodex_apply_patch::MaybeApplyPatchVerified;
13use agcodex_apply_patch::maybe_parse_apply_patch_verified;
14use agcodex_login::CodexAuth;
15use agcodex_mcp_types::CallToolResult;
16use agcodex_protocol::protocol::TurnAbortReason;
17use agcodex_protocol::protocol::TurnAbortedEvent;
18use async_channel::Receiver;
19use async_channel::Sender;
20use futures::prelude::*;
21use serde::Serialize;
22use serde_json;
23use tokio::sync::oneshot;
24use tokio::task::AbortHandle;
25use tracing::debug;
26use tracing::error;
27use tracing::info;
28use tracing::trace;
29use tracing::warn;
30use uuid::Uuid;
31
32use crate::ModelProviderInfo;
33use crate::apply_patch;
34use crate::apply_patch::ApplyPatchExec;
35use crate::apply_patch::CODEX_APPLY_PATCH_ARG1;
36use crate::apply_patch::InternalApplyPatchInvocation;
37use crate::apply_patch::convert_apply_patch_to_protocol;
38use crate::client::ModelClient;
39use crate::client_common::Prompt;
40use crate::client_common::ResponseEvent;
41use crate::config::Config;
42use crate::config_types::ShellEnvironmentPolicy;
43use crate::conversation_history::ConversationHistory;
44use crate::environment_context::EnvironmentContext;
45use crate::error::CodexErr;
46use crate::error::Result as CodexResult;
47use crate::error::Result;
48use crate::error::SandboxErr;
49use crate::error::get_error_message_ui;
50use crate::exec::ExecParams;
51use crate::exec::ExecToolCallOutput;
52use crate::exec::SandboxType;
53use crate::exec::StdoutStream;
54use crate::exec::StreamOutput;
55use crate::exec::process_exec_tool_call;
56use crate::exec_env::create_env;
57use crate::mcp_connection_manager::McpConnectionManager;
58use crate::mcp_tool_call::handle_mcp_tool_call;
59use crate::model_family::find_family_for_model;
60use crate::models::ContentItem;
61use crate::models::FunctionCallOutputPayload;
62use crate::models::LocalShellAction;
63use crate::models::ReasoningItemContent;
64use crate::models::ReasoningItemReasoningSummary;
65use crate::models::ResponseInputItem;
66use crate::models::ResponseItem;
67use crate::models::ShellToolCallParams;
68use crate::openai_tools::ApplyPatchToolArgs;
69use crate::openai_tools::ToolsConfig;
70use crate::openai_tools::get_openai_tools;
71use crate::parse_command::parse_command;
72use crate::plan_tool::handle_update_plan;
73use crate::project_doc::get_user_instructions;
74use crate::protocol::AgentMessageDeltaEvent;
75use crate::protocol::AgentMessageEvent;
76use crate::protocol::AgentReasoningDeltaEvent;
77use crate::protocol::AgentReasoningEvent;
78use crate::protocol::AgentReasoningRawContentDeltaEvent;
79use crate::protocol::AgentReasoningRawContentEvent;
80use crate::protocol::AgentReasoningSectionBreakEvent;
81use crate::protocol::ApplyPatchApprovalRequestEvent;
82use crate::protocol::AskForApproval;
83use crate::protocol::BackgroundEventEvent;
84use crate::protocol::ErrorEvent;
85use crate::protocol::Event;
86use crate::protocol::EventMsg;
87use crate::protocol::ExecApprovalRequestEvent;
88use crate::protocol::ExecCommandBeginEvent;
89use crate::protocol::ExecCommandEndEvent;
90use crate::protocol::FileChange;
91use crate::protocol::InputItem;
92use crate::protocol::Op;
93use crate::protocol::PatchApplyBeginEvent;
94use crate::protocol::PatchApplyEndEvent;
95use crate::protocol::ReviewDecision;
96use crate::protocol::SandboxPolicy;
97use crate::protocol::SessionConfiguredEvent;
98use crate::protocol::Submission;
99use crate::protocol::TaskCompleteEvent;
100use crate::protocol::TurnDiffEvent;
101use crate::rollout::RolloutRecorder;
102use crate::safety::SafetyCheck;
103use crate::safety::assess_command_safety;
104use crate::safety::assess_safety_for_untrusted_command;
105use crate::shell;
106use crate::turn_diff_tracker::TurnDiffTracker;
107use crate::user_notification::UserNotification;
108use crate::util::backoff;
109use agcodex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
110use agcodex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
111
112// A convenience extension trait for acquiring mutex locks where poisoning is
113// unrecoverable and should abort the program. This avoids scattered `.unwrap()`
114// calls on `lock()` while still surfacing a clear panic message when a lock is
115// poisoned.
116trait MutexExt<T> {
117    fn lock_unchecked(&self) -> MutexGuard<'_, T>;
118}
119
120impl<T> MutexExt<T> for Mutex<T> {
121    fn lock_unchecked(&self) -> MutexGuard<'_, T> {
122        #[expect(clippy::expect_used)]
123        self.lock().expect("poisoned lock")
124    }
125}
126
127/// The high-level interface to the Codex system.
128/// It operates as a queue pair where you send submissions and receive events.
129pub struct Codex {
130    next_id: AtomicU64,
131    tx_sub: Sender<Submission>,
132    rx_event: Receiver<Event>,
133}
134
135/// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`],
136/// the submission id for the initial `ConfigureSession` request and the
137/// unique session id.
138pub struct CodexSpawnOk {
139    pub codex: Codex,
140    pub session_id: Uuid,
141}
142
143pub(crate) const INITIAL_SUBMIT_ID: &str = "";
144
145impl Codex {
146    /// Spawn a new [`Codex`] and initialize the session.
147    pub async fn spawn(config: Config, auth: Option<CodexAuth>) -> CodexResult<CodexSpawnOk> {
148        let (tx_sub, rx_sub) = async_channel::bounded(64);
149        let (tx_event, rx_event) = async_channel::unbounded();
150
151        let user_instructions = get_user_instructions(&config).await;
152
153        let config = Arc::new(config);
154        let resume_path = config.experimental_resume.clone();
155
156        let configure_session = ConfigureSession {
157            provider: config.model_provider.clone(),
158            model: config.model.clone(),
159            model_reasoning_effort: config.model_reasoning_effort,
160            model_reasoning_summary: config.model_reasoning_summary,
161            user_instructions,
162            base_instructions: config.base_instructions.clone(),
163            approval_policy: config.approval_policy,
164            sandbox_policy: config.sandbox_policy.clone(),
165            disable_response_storage: config.disable_response_storage,
166            notify: config.notify.clone(),
167            cwd: config.cwd.clone(),
168            resume_path,
169        };
170
171        // Generate a unique ID for the lifetime of this Codex session.
172        let (session, turn_context) =
173            Session::new(configure_session, config.clone(), auth, tx_event.clone())
174                .await
175                .map_err(|e| {
176                    error!("Failed to create session: {e:#}");
177                    CodexErr::InternalAgentDied
178                })?;
179        let session_id = session.session_id;
180
181        // This task will run until Op::Shutdown is received.
182        tokio::spawn(submission_loop(session, turn_context, config, rx_sub));
183        let codex = Codex {
184            next_id: AtomicU64::new(0),
185            tx_sub,
186            rx_event,
187        };
188
189        Ok(CodexSpawnOk { codex, session_id })
190    }
191
192    /// Submit the `op` wrapped in a `Submission` with a unique ID.
193    pub async fn submit(&self, op: Op) -> CodexResult<String> {
194        let id = self
195            .next_id
196            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
197            .to_string();
198        let sub = Submission { id: id.clone(), op };
199        self.submit_with_id(sub).await?;
200        Ok(id)
201    }
202
203    /// Use sparingly: prefer `submit()` so Codex is responsible for generating
204    /// unique IDs for each submission.
205    pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
206        self.tx_sub
207            .send(sub)
208            .await
209            .map_err(|_| CodexErr::InternalAgentDied)?;
210        Ok(())
211    }
212
213    pub async fn next_event(&self) -> CodexResult<Event> {
214        let event = self
215            .rx_event
216            .recv()
217            .await
218            .map_err(|_| CodexErr::InternalAgentDied)?;
219        Ok(event)
220    }
221}
222
223/// Mutable state of the agent
224#[derive(Default)]
225struct State {
226    approved_commands: HashSet<Vec<String>>,
227    current_task: Option<AgentTask>,
228    pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
229    pending_input: Vec<ResponseInputItem>,
230    history: ConversationHistory,
231}
232
233/// Context for an initialized model agent
234///
235/// A session has at most 1 running task at a time, and can be interrupted by user input.
236pub(crate) struct Session {
237    session_id: Uuid,
238    tx_event: Sender<Event>,
239
240    /// Manager for external MCP servers/tools.
241    mcp_connection_manager: McpConnectionManager,
242
243    /// External notifier command (will be passed as args to exec()). When
244    /// `None` this feature is disabled.
245    notify: Option<Vec<String>>,
246
247    /// Optional rollout recorder for persisting the conversation transcript so
248    /// sessions can be replayed or inspected later.
249    rollout: Mutex<Option<RolloutRecorder>>,
250    state: Mutex<State>,
251    codex_linux_sandbox_exe: Option<PathBuf>,
252    user_shell: shell::Shell,
253    show_raw_agent_reasoning: bool,
254}
255
256/// The context needed for a single turn of the conversation.
257#[derive(Debug)]
258pub(crate) struct TurnContext {
259    pub(crate) client: ModelClient,
260    /// The session's current working directory. All relative paths provided by
261    /// the model as well as sandbox policies are resolved against this path
262    /// instead of `std::env::current_dir()`.
263    pub(crate) cwd: PathBuf,
264    pub(crate) base_instructions: Option<String>,
265    pub(crate) user_instructions: Option<String>,
266    pub(crate) approval_policy: AskForApproval,
267    pub(crate) sandbox_policy: SandboxPolicy,
268    pub(crate) shell_environment_policy: ShellEnvironmentPolicy,
269    pub(crate) disable_response_storage: bool,
270    pub(crate) tools_config: ToolsConfig,
271}
272
273impl TurnContext {
274    fn resolve_path(&self, path: Option<String>) -> PathBuf {
275        path.as_ref()
276            .map(PathBuf::from)
277            .map_or_else(|| self.cwd.clone(), |p| self.cwd.join(p))
278    }
279}
280
281/// Configure the model session.
282struct ConfigureSession {
283    /// Provider identifier ("openai", "openrouter", ...).
284    provider: ModelProviderInfo,
285
286    /// If not specified, server will use its default model.
287    model: String,
288
289    model_reasoning_effort: ReasoningEffortConfig,
290    model_reasoning_summary: ReasoningSummaryConfig,
291
292    /// Model instructions that are appended to the base instructions.
293    user_instructions: Option<String>,
294
295    /// Base instructions override.
296    base_instructions: Option<String>,
297
298    /// When to escalate for approval for execution
299    approval_policy: AskForApproval,
300    /// How to sandbox commands executed in the system
301    sandbox_policy: SandboxPolicy,
302    /// Disable server-side response storage (send full context each request)
303    disable_response_storage: bool,
304
305    /// Optional external notifier command tokens. Present only when the
306    /// client wants the agent to spawn a program after each completed
307    /// turn.
308    notify: Option<Vec<String>>,
309
310    /// Working directory that should be treated as the *root* of the
311    /// session. All relative paths supplied by the model as well as the
312    /// execution sandbox are resolved against this directory **instead**
313    /// of the process-wide current working directory. CLI front-ends are
314    /// expected to expand this to an absolute path before sending the
315    /// `ConfigureSession` operation so that the business-logic layer can
316    /// operate deterministically.
317    cwd: PathBuf,
318
319    resume_path: Option<PathBuf>,
320}
321
322impl Session {
323    async fn new(
324        configure_session: ConfigureSession,
325        config: Arc<Config>,
326        auth: Option<CodexAuth>,
327        tx_event: Sender<Event>,
328    ) -> Result<(Arc<Self>, TurnContext)> {
329        let ConfigureSession {
330            provider,
331            model,
332            model_reasoning_effort,
333            model_reasoning_summary,
334            user_instructions,
335            base_instructions,
336            approval_policy,
337            sandbox_policy,
338            disable_response_storage,
339            notify,
340            cwd,
341            resume_path,
342        } = configure_session;
343        debug!("Configuring session: model={model}; provider={provider:?}");
344        if !cwd.is_absolute() {
345            return Err(CodexErr::InvalidWorkingDirectory(format!(
346                "cwd is not absolute: {cwd:?}"
347            )));
348        }
349
350        // Error messages to dispatch after SessionConfigured is sent.
351        let mut post_session_configured_error_events = Vec::<Event>::new();
352
353        // Kick off independent async setup tasks in parallel to reduce startup latency.
354        //
355        // - initialize RolloutRecorder with new or resumed session info
356        // - spin up MCP connection manager
357        // - perform default shell discovery
358        // - load history metadata
359        let rollout_fut = async {
360            match resume_path.as_ref() {
361                Some(path) => RolloutRecorder::resume(path, cwd.clone())
362                    .await
363                    .map(|(rec, saved)| (saved.session_id, Some(saved), rec)),
364                None => {
365                    let session_id = Uuid::new_v4();
366                    RolloutRecorder::new(&config, session_id, user_instructions.clone())
367                        .await
368                        .map(|rec| (session_id, None, rec))
369                }
370            }
371        };
372
373        let mcp_fut = McpConnectionManager::new(config.mcp_servers.clone());
374        let default_shell_fut = shell::default_user_shell();
375        let history_meta_fut = crate::message_history::history_metadata(&config);
376
377        // Join all independent futures.
378        let (rollout_res, mcp_res, default_shell, (history_log_id, history_entry_count)) =
379            tokio::join!(rollout_fut, mcp_fut, default_shell_fut, history_meta_fut);
380
381        // Handle rollout result, which determines the session_id.
382        struct RolloutResult {
383            session_id: Uuid,
384            rollout_recorder: Option<RolloutRecorder>,
385            restored_items: Option<Vec<ResponseItem>>,
386        }
387        let rollout_result = match rollout_res {
388            Ok((session_id, maybe_saved, recorder)) => {
389                let restored_items: Option<Vec<ResponseItem>> =
390                    maybe_saved.and_then(|saved_session| {
391                        if saved_session.items.is_empty() {
392                            None
393                        } else {
394                            Some(saved_session.items)
395                        }
396                    });
397                RolloutResult {
398                    session_id,
399                    rollout_recorder: Some(recorder),
400                    restored_items,
401                }
402            }
403            Err(e) => {
404                if let Some(path) = resume_path.as_ref() {
405                    return Err(CodexErr::McpServer(format!(
406                        "failed to resume rollout from {path:?}: {e}"
407                    )));
408                }
409
410                let message = format!("failed to initialize rollout recorder: {e}");
411                post_session_configured_error_events.push(Event {
412                    id: INITIAL_SUBMIT_ID.to_owned(),
413                    msg: EventMsg::Error(ErrorEvent {
414                        message: message.clone(),
415                    }),
416                });
417                warn!("{message}");
418
419                RolloutResult {
420                    session_id: Uuid::new_v4(),
421                    rollout_recorder: None,
422                    restored_items: None,
423                }
424            }
425        };
426
427        let RolloutResult {
428            session_id,
429            rollout_recorder,
430            restored_items,
431        } = rollout_result;
432
433        // Create the mutable state for the Session.
434        let mut state = State {
435            history: ConversationHistory::new(),
436            ..Default::default()
437        };
438        if let Some(restored_items) = restored_items {
439            state.history.record_items(&restored_items);
440        }
441
442        // Handle MCP manager result and record any startup failures.
443        let (mcp_connection_manager, failed_clients) = match mcp_res {
444            Ok((mgr, failures)) => (mgr, failures),
445            Err(e) => {
446                let message = format!("Failed to create MCP connection manager: {e:#}");
447                error!("{message}");
448                post_session_configured_error_events.push(Event {
449                    id: INITIAL_SUBMIT_ID.to_owned(),
450                    msg: EventMsg::Error(ErrorEvent { message }),
451                });
452                (McpConnectionManager::default(), Default::default())
453            }
454        };
455
456        // Surface individual client start-up failures to the user.
457        if !failed_clients.is_empty() {
458            for (server_name, err) in failed_clients {
459                let message = format!("MCP client for `{server_name}` failed to start: {err:#}");
460                error!("{message}");
461                post_session_configured_error_events.push(Event {
462                    id: INITIAL_SUBMIT_ID.to_owned(),
463                    msg: EventMsg::Error(ErrorEvent { message }),
464                });
465            }
466        }
467
468        // Now that `session_id` is final (may have been updated by resume),
469        // construct the model client.
470        let client = ModelClient::new(
471            config.clone(),
472            auth.clone(),
473            provider.clone(),
474            model_reasoning_effort,
475            model_reasoning_summary,
476            session_id,
477        );
478        let turn_context = TurnContext {
479            client,
480            tools_config: ToolsConfig::new(
481                &config.model_family,
482                approval_policy,
483                sandbox_policy.clone(),
484                config.include_plan_tool,
485                config.include_apply_patch_tool,
486            ),
487            user_instructions,
488            base_instructions,
489            approval_policy,
490            sandbox_policy,
491            shell_environment_policy: config.shell_environment_policy.clone(),
492            cwd,
493            disable_response_storage,
494        };
495        let sess = Arc::new(Session {
496            session_id,
497            tx_event: tx_event.clone(),
498            mcp_connection_manager,
499            notify,
500            state: Mutex::new(state),
501            rollout: Mutex::new(rollout_recorder),
502            codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
503            user_shell: default_shell,
504            show_raw_agent_reasoning: config.show_raw_agent_reasoning,
505        });
506
507        // record the initial user instructions and environment context,
508        // regardless of whether we restored items.
509        let mut conversation_items = Vec::<ResponseItem>::with_capacity(2);
510        if let Some(user_instructions) = turn_context.user_instructions.as_deref() {
511            conversation_items.push(Prompt::format_user_instructions_message(user_instructions));
512        }
513        conversation_items.push(ResponseItem::from(EnvironmentContext::new(
514            turn_context.cwd.to_path_buf(),
515            turn_context.approval_policy,
516            turn_context.sandbox_policy.clone(),
517        )));
518        sess.record_conversation_items(&conversation_items).await;
519
520        // Dispatch the SessionConfiguredEvent first and then report any errors.
521        let events = std::iter::once(Event {
522            id: INITIAL_SUBMIT_ID.to_owned(),
523            msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
524                session_id,
525                model,
526                history_log_id,
527                history_entry_count,
528            }),
529        })
530        .chain(post_session_configured_error_events.into_iter());
531        for event in events {
532            if let Err(e) = tx_event.send(event).await {
533                error!("failed to send event: {e:?}");
534            }
535        }
536
537        Ok((sess, turn_context))
538    }
539
540    pub fn set_task(&self, task: AgentTask) {
541        let mut state = self.state.lock_unchecked();
542        if let Some(current_task) = state.current_task.take() {
543            current_task.abort(TurnAbortReason::Replaced);
544        }
545        state.current_task = Some(task);
546    }
547
548    pub fn remove_task(&self, sub_id: &str) {
549        let mut state = self.state.lock_unchecked();
550        if let Some(task) = &state.current_task
551            && task.sub_id == sub_id
552        {
553            state.current_task.take();
554        }
555    }
556
557    /// Sends the given event to the client and swallows the send event, if
558    /// any, logging it as an error.
559    pub(crate) async fn send_event(&self, event: Event) {
560        if let Err(e) = self.tx_event.send(event).await {
561            error!("failed to send tool call event: {e}");
562        }
563    }
564
565    pub async fn request_command_approval(
566        &self,
567        sub_id: String,
568        call_id: String,
569        command: Vec<String>,
570        cwd: PathBuf,
571        reason: Option<String>,
572    ) -> oneshot::Receiver<ReviewDecision> {
573        let (tx_approve, rx_approve) = oneshot::channel();
574        let event = Event {
575            id: sub_id.clone(),
576            msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
577                call_id,
578                command,
579                cwd,
580                reason,
581            }),
582        };
583        let _ = self.tx_event.send(event).await;
584        {
585            let mut state = self.state.lock_unchecked();
586            state.pending_approvals.insert(sub_id, tx_approve);
587        }
588        rx_approve
589    }
590
591    pub async fn request_patch_approval(
592        &self,
593        sub_id: String,
594        call_id: String,
595        action: &ApplyPatchAction,
596        reason: Option<String>,
597        grant_root: Option<PathBuf>,
598    ) -> oneshot::Receiver<ReviewDecision> {
599        let (tx_approve, rx_approve) = oneshot::channel();
600        let event = Event {
601            id: sub_id.clone(),
602            msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
603                call_id,
604                changes: convert_apply_patch_to_protocol(action),
605                reason,
606                grant_root,
607            }),
608        };
609        let _ = self.tx_event.send(event).await;
610        {
611            let mut state = self.state.lock_unchecked();
612            state.pending_approvals.insert(sub_id, tx_approve);
613        }
614        rx_approve
615    }
616
617    pub fn notify_approval(&self, sub_id: &str, decision: ReviewDecision) {
618        let mut state = self.state.lock_unchecked();
619        if let Some(tx_approve) = state.pending_approvals.remove(sub_id) {
620            tx_approve.send(decision).ok();
621        }
622    }
623
624    pub fn add_approved_command(&self, cmd: Vec<String>) {
625        let mut state = self.state.lock_unchecked();
626        state.approved_commands.insert(cmd);
627    }
628
629    /// Records items to both the rollout and the chat completions/ZDR
630    /// transcript, if enabled.
631    async fn record_conversation_items(&self, items: &[ResponseItem]) {
632        debug!("Recording items for conversation: {items:?}");
633        self.record_state_snapshot(items).await;
634
635        self.state.lock_unchecked().history.record_items(items);
636    }
637
638    async fn record_state_snapshot(&self, items: &[ResponseItem]) {
639        let snapshot = { crate::rollout::SessionStateSnapshot {} };
640
641        let recorder = {
642            let guard = self.rollout.lock_unchecked();
643            guard.as_ref().cloned()
644        };
645
646        if let Some(rec) = recorder {
647            if let Err(e) = rec.record_state(snapshot).await {
648                error!("failed to record rollout state: {e:#}");
649            }
650            if let Err(e) = rec.record_items(items).await {
651                error!("failed to record rollout items: {e:#}");
652            }
653        }
654    }
655
656    async fn on_exec_command_begin(
657        &self,
658        turn_diff_tracker: &mut TurnDiffTracker,
659        exec_command_context: ExecCommandContext,
660    ) {
661        let ExecCommandContext {
662            sub_id,
663            call_id,
664            command_for_display,
665            cwd,
666            apply_patch,
667        } = exec_command_context;
668        let msg = match apply_patch {
669            Some(ApplyPatchCommandContext {
670                user_explicitly_approved_this_action,
671                changes,
672            }) => {
673                turn_diff_tracker.on_patch_begin(&changes);
674
675                EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
676                    call_id,
677                    auto_approved: !user_explicitly_approved_this_action,
678                    changes,
679                })
680            }
681            None => EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
682                call_id,
683                command: command_for_display.clone(),
684                cwd,
685                parsed_cmd: parse_command(&command_for_display)
686                    .into_iter()
687                    .map(Into::into)
688                    .collect(),
689            }),
690        };
691        let event = Event {
692            id: sub_id.to_string(),
693            msg,
694        };
695        let _ = self.tx_event.send(event).await;
696    }
697
698    #[allow(clippy::too_many_arguments)]
699    async fn on_exec_command_end(
700        &self,
701        turn_diff_tracker: &mut TurnDiffTracker,
702        sub_id: &str,
703        call_id: &str,
704        output: &ExecToolCallOutput,
705        is_apply_patch: bool,
706    ) {
707        let ExecToolCallOutput {
708            stdout,
709            stderr,
710            duration,
711            exit_code,
712        } = output;
713        // Because stdout and stderr could each be up to 100 KiB, we send
714        // truncated versions.
715        const MAX_STREAM_OUTPUT: usize = 5 * 1024; // 5KiB
716        let stdout = stdout.text.chars().take(MAX_STREAM_OUTPUT).collect();
717        let stderr = stderr.text.chars().take(MAX_STREAM_OUTPUT).collect();
718
719        let msg = if is_apply_patch {
720            EventMsg::PatchApplyEnd(PatchApplyEndEvent {
721                call_id: call_id.to_string(),
722                stdout,
723                stderr,
724                success: *exit_code == 0,
725            })
726        } else {
727            EventMsg::ExecCommandEnd(ExecCommandEndEvent {
728                call_id: call_id.to_string(),
729                stdout,
730                stderr,
731                duration: *duration,
732                exit_code: *exit_code,
733            })
734        };
735
736        let event = Event {
737            id: sub_id.to_string(),
738            msg,
739        };
740        let _ = self.tx_event.send(event).await;
741
742        // If this is an apply_patch, after we emit the end patch, emit a second event
743        // with the full turn diff if there is one.
744        if is_apply_patch {
745            let unified_diff = turn_diff_tracker.get_unified_diff();
746            if let Ok(Some(unified_diff)) = unified_diff {
747                let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
748                let event = Event {
749                    id: sub_id.into(),
750                    msg,
751                };
752                let _ = self.tx_event.send(event).await;
753            }
754        }
755    }
756    /// Runs the exec tool call and emits events for the begin and end of the
757    /// command even on error.
758    ///
759    /// Returns the output of the exec tool call.
760    async fn run_exec_with_events<'a>(
761        &self,
762        turn_diff_tracker: &mut TurnDiffTracker,
763        begin_ctx: ExecCommandContext,
764        exec_args: ExecInvokeArgs<'a>,
765    ) -> crate::error::Result<ExecToolCallOutput> {
766        let is_apply_patch = begin_ctx.apply_patch.is_some();
767        let sub_id = begin_ctx.sub_id.clone();
768        let call_id = begin_ctx.call_id.clone();
769
770        self.on_exec_command_begin(turn_diff_tracker, begin_ctx.clone())
771            .await;
772
773        // TODO: Integrate proper mode management into Session
774        // For now, use default mode restrictions (Build mode) to maintain compatibility
775        let default_mode_restrictions = crate::modes::ModeRestrictions {
776            allow_file_write: true,
777            allow_command_exec: true,
778            allow_network_access: true,
779            allow_git_operations: true,
780            max_file_size: None,
781        };
782
783        let result = process_exec_tool_call(
784            exec_args.params,
785            exec_args.sandbox_type,
786            exec_args.sandbox_policy,
787            exec_args.codex_linux_sandbox_exe,
788            exec_args.stdout_stream,
789            &default_mode_restrictions,
790        )
791        .await;
792
793        let output_stderr;
794        let borrowed: &ExecToolCallOutput = match &result {
795            Ok(output) => output,
796            Err(e) => {
797                output_stderr = ExecToolCallOutput {
798                    exit_code: -1,
799                    stdout: StreamOutput::new(String::new()),
800                    stderr: StreamOutput::new(get_error_message_ui(e)),
801                    duration: Duration::default(),
802                };
803                &output_stderr
804            }
805        };
806        self.on_exec_command_end(
807            turn_diff_tracker,
808            &sub_id,
809            &call_id,
810            borrowed,
811            is_apply_patch,
812        )
813        .await;
814
815        result
816    }
817
818    /// Helper that emits a BackgroundEvent with the given message. This keeps
819    /// the call‑sites terse so adding more diagnostics does not clutter the
820    /// core agent logic.
821    async fn notify_background_event(&self, sub_id: &str, message: impl Into<String>) {
822        let event = Event {
823            id: sub_id.to_string(),
824            msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
825                message: message.into(),
826            }),
827        };
828        let _ = self.tx_event.send(event).await;
829    }
830
831    /// Build the full turn input by concatenating the current conversation
832    /// history with additional items for this turn.
833    pub fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
834        [self.state.lock_unchecked().history.contents(), extra].concat()
835    }
836
837    /// Returns the input if there was no task running to inject into
838    pub fn inject_input(&self, input: Vec<InputItem>) -> std::result::Result<(), Vec<InputItem>> {
839        let mut state = self.state.lock_unchecked();
840        if state.current_task.is_some() {
841            state.pending_input.push(input.into());
842            Ok(())
843        } else {
844            Err(input)
845        }
846    }
847
848    pub fn get_pending_input(&self) -> Vec<ResponseInputItem> {
849        let mut state = self.state.lock_unchecked();
850        if state.pending_input.is_empty() {
851            Vec::with_capacity(0)
852        } else {
853            let mut ret = Vec::new();
854            std::mem::swap(&mut ret, &mut state.pending_input);
855            ret
856        }
857    }
858
859    pub async fn call_tool(
860        &self,
861        server: &str,
862        tool: &str,
863        arguments: Option<serde_json::Value>,
864        timeout: Option<Duration>,
865    ) -> Result<CallToolResult> {
866        self.mcp_connection_manager
867            .call_tool(server, tool, arguments, timeout)
868            .await
869    }
870
871    fn interrupt_task(&self) {
872        info!("interrupt received: abort current task, if any");
873        let mut state = self.state.lock_unchecked();
874        state.pending_approvals.clear();
875        state.pending_input.clear();
876        if let Some(task) = state.current_task.take() {
877            task.abort(TurnAbortReason::Interrupted);
878        }
879    }
880
881    /// Spawn the configured notifier (if any) with the given JSON payload as
882    /// the last argument. Failures are logged but otherwise ignored so that
883    /// notification issues do not interfere with the main workflow.
884    fn maybe_notify(&self, notification: UserNotification) {
885        let Some(notify_command) = &self.notify else {
886            return;
887        };
888
889        if notify_command.is_empty() {
890            return;
891        }
892
893        let Ok(json) = serde_json::to_string(&notification) else {
894            error!("failed to serialise notification payload");
895            return;
896        };
897
898        let mut command = std::process::Command::new(&notify_command[0]);
899        if notify_command.len() > 1 {
900            command.args(&notify_command[1..]);
901        }
902        command.arg(json);
903
904        // Fire-and-forget – we do not wait for completion.
905        if let Err(e) = command.spawn() {
906            warn!("failed to spawn notifier '{}': {e}", notify_command[0]);
907        }
908    }
909}
910
911impl Drop for Session {
912    fn drop(&mut self) {
913        self.interrupt_task();
914    }
915}
916
917#[derive(Clone, Debug)]
918pub(crate) struct ExecCommandContext {
919    pub(crate) sub_id: String,
920    pub(crate) call_id: String,
921    pub(crate) command_for_display: Vec<String>,
922    pub(crate) cwd: PathBuf,
923    pub(crate) apply_patch: Option<ApplyPatchCommandContext>,
924}
925
926#[derive(Clone, Debug)]
927pub(crate) struct ApplyPatchCommandContext {
928    pub(crate) user_explicitly_approved_this_action: bool,
929    pub(crate) changes: HashMap<PathBuf, FileChange>,
930}
931
932/// A series of Turns in response to user input.
933pub(crate) struct AgentTask {
934    sess: Arc<Session>,
935    sub_id: String,
936    handle: AbortHandle,
937}
938
939impl AgentTask {
940    fn spawn(
941        sess: Arc<Session>,
942        turn_context: Arc<TurnContext>,
943        sub_id: String,
944        input: Vec<InputItem>,
945    ) -> Self {
946        let handle = {
947            let sess = sess.clone();
948            let sub_id = sub_id.clone();
949            let tc = Arc::clone(&turn_context);
950            tokio::spawn(async move { run_task(sess, tc.as_ref(), sub_id, input).await })
951                .abort_handle()
952        };
953        Self {
954            sess,
955            sub_id,
956            handle,
957        }
958    }
959
960    fn compact(
961        sess: Arc<Session>,
962        turn_context: Arc<TurnContext>,
963        sub_id: String,
964        input: Vec<InputItem>,
965        compact_instructions: String,
966    ) -> Self {
967        let handle = {
968            let sess = sess.clone();
969            let sub_id = sub_id.clone();
970            let tc = Arc::clone(&turn_context);
971            tokio::spawn(async move {
972                run_compact_task(sess, tc.as_ref(), sub_id, input, compact_instructions).await
973            })
974            .abort_handle()
975        };
976        Self {
977            sess,
978            sub_id,
979            handle,
980        }
981    }
982
983    fn abort(self, reason: TurnAbortReason) {
984        // TOCTOU?
985        if !self.handle.is_finished() {
986            self.handle.abort();
987            let event = Event {
988                id: self.sub_id,
989                msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }),
990            };
991            let tx_event = self.sess.tx_event.clone();
992            tokio::spawn(async move {
993                tx_event.send(event).await.ok();
994            });
995        }
996    }
997}
998
999async fn submission_loop(
1000    sess: Arc<Session>,
1001    turn_context: TurnContext,
1002    config: Arc<Config>,
1003    rx_sub: Receiver<Submission>,
1004) {
1005    // Wrap once to avoid cloning TurnContext for each task.
1006    let mut turn_context = Arc::new(turn_context);
1007    // To break out of this loop, send Op::Shutdown.
1008    while let Ok(sub) = rx_sub.recv().await {
1009        debug!(?sub, "Submission");
1010        match sub.op {
1011            Op::Interrupt => {
1012                sess.interrupt_task();
1013            }
1014            Op::OverrideTurnContext {
1015                cwd,
1016                approval_policy,
1017                sandbox_policy,
1018                model,
1019                effort,
1020                summary,
1021            } => {
1022                // Recalculate the persistent turn context with provided overrides.
1023                let prev = Arc::clone(&turn_context);
1024                let provider = prev.client.get_provider();
1025
1026                // Effective model + family
1027                let (effective_model, effective_family) = if let Some(m) = model {
1028                    let fam =
1029                        find_family_for_model(&m).unwrap_or_else(|| config.model_family.clone());
1030                    (m, fam)
1031                } else {
1032                    (prev.client.get_model(), prev.client.get_model_family())
1033                };
1034
1035                // Effective reasoning settings
1036                let effective_effort = effort.unwrap_or(prev.client.get_reasoning_effort());
1037                let effective_summary = summary.unwrap_or(prev.client.get_reasoning_summary());
1038
1039                let auth = prev.client.get_auth();
1040                // Build updated config for the client
1041                let mut updated_config = (*config).clone();
1042                updated_config.model = effective_model.clone();
1043                updated_config.model_family = effective_family.clone();
1044
1045                let client = ModelClient::new(
1046                    Arc::new(updated_config),
1047                    auth,
1048                    provider,
1049                    effective_effort,
1050                    effective_summary,
1051                    sess.session_id,
1052                );
1053
1054                let new_approval_policy = approval_policy.unwrap_or(prev.approval_policy);
1055                let new_sandbox_policy = sandbox_policy
1056                    .clone()
1057                    .unwrap_or(prev.sandbox_policy.clone());
1058                let new_cwd = cwd.clone().unwrap_or_else(|| prev.cwd.clone());
1059
1060                let tools_config = ToolsConfig::new(
1061                    &effective_family,
1062                    new_approval_policy,
1063                    new_sandbox_policy.clone(),
1064                    config.include_plan_tool,
1065                    config.include_apply_patch_tool,
1066                );
1067
1068                let new_turn_context = TurnContext {
1069                    client,
1070                    tools_config,
1071                    user_instructions: prev.user_instructions.clone(),
1072                    base_instructions: prev.base_instructions.clone(),
1073                    approval_policy: new_approval_policy,
1074                    sandbox_policy: new_sandbox_policy.clone(),
1075                    shell_environment_policy: prev.shell_environment_policy.clone(),
1076                    cwd: new_cwd.clone(),
1077                    disable_response_storage: prev.disable_response_storage,
1078                };
1079
1080                // Install the new persistent context for subsequent tasks/turns.
1081                turn_context = Arc::new(new_turn_context);
1082                if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
1083                    sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new(
1084                        new_cwd,
1085                        new_approval_policy,
1086                        new_sandbox_policy,
1087                    ))])
1088                    .await;
1089                }
1090            }
1091            Op::UserInput { items } => {
1092                // attempt to inject input into current task
1093                if let Err(items) = sess.inject_input(items) {
1094                    // no current task, spawn a new one
1095                    let task =
1096                        AgentTask::spawn(sess.clone(), Arc::clone(&turn_context), sub.id, items);
1097                    sess.set_task(task);
1098                }
1099            }
1100            Op::UserTurn {
1101                items,
1102                cwd,
1103                approval_policy,
1104                sandbox_policy,
1105                model,
1106                effort,
1107                summary,
1108            } => {
1109                // attempt to inject input into current task
1110                if let Err(items) = sess.inject_input(items) {
1111                    // Derive a fresh TurnContext for this turn using the provided overrides.
1112                    let provider = turn_context.client.get_provider();
1113
1114                    // Derive a model family for the requested model; fall back to the session's.
1115                    let model_family = find_family_for_model(&model)
1116                        .unwrap_or_else(|| config.model_family.clone());
1117
1118                    // Create a per‑turn Config clone with the requested model/family.
1119                    let mut per_turn_config = (*config).clone();
1120                    per_turn_config.model = model.clone();
1121                    per_turn_config.model_family = model_family.clone();
1122
1123                    // Build a new client with per‑turn reasoning settings.
1124                    // Reuse the same provider and session id; auth defaults to env/API key.
1125                    let client = ModelClient::new(
1126                        Arc::new(per_turn_config),
1127                        None,
1128                        provider,
1129                        effort,
1130                        summary,
1131                        sess.session_id,
1132                    );
1133
1134                    let fresh_turn_context = TurnContext {
1135                        client,
1136                        tools_config: ToolsConfig::new(
1137                            &model_family,
1138                            approval_policy,
1139                            sandbox_policy.clone(),
1140                            config.include_plan_tool,
1141                            config.include_apply_patch_tool,
1142                        ),
1143                        user_instructions: turn_context.user_instructions.clone(),
1144                        base_instructions: turn_context.base_instructions.clone(),
1145                        approval_policy,
1146                        sandbox_policy,
1147                        shell_environment_policy: turn_context.shell_environment_policy.clone(),
1148                        cwd,
1149                        disable_response_storage: turn_context.disable_response_storage,
1150                    };
1151                    // TODO: record the new environment context in the conversation history
1152                    // no current task, spawn a new one with the per‑turn context
1153                    let task =
1154                        AgentTask::spawn(sess.clone(), Arc::new(fresh_turn_context), sub.id, items);
1155                    sess.set_task(task);
1156                }
1157            }
1158            Op::ExecApproval { id, decision } => match decision {
1159                ReviewDecision::Abort => {
1160                    sess.interrupt_task();
1161                }
1162                other => sess.notify_approval(&id, other),
1163            },
1164            Op::PatchApproval { id, decision } => match decision {
1165                ReviewDecision::Abort => {
1166                    sess.interrupt_task();
1167                }
1168                other => sess.notify_approval(&id, other),
1169            },
1170            Op::AddToHistory { text } => {
1171                let id = sess.session_id;
1172                let config = config.clone();
1173                tokio::spawn(async move {
1174                    if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await
1175                    {
1176                        warn!("failed to append to message history: {e}");
1177                    }
1178                });
1179            }
1180
1181            Op::GetHistoryEntryRequest { offset, log_id } => {
1182                let config = config.clone();
1183                let tx_event = sess.tx_event.clone();
1184                let sub_id = sub.id.clone();
1185
1186                tokio::spawn(async move {
1187                    // Run lookup in blocking thread because it does file IO + locking.
1188                    let entry_opt = tokio::task::spawn_blocking(move || {
1189                        crate::message_history::lookup(log_id, offset, &config)
1190                    })
1191                    .await
1192                    .unwrap_or(None);
1193
1194                    let event = Event {
1195                        id: sub_id,
1196                        msg: EventMsg::GetHistoryEntryResponse(
1197                            crate::protocol::GetHistoryEntryResponseEvent {
1198                                offset,
1199                                log_id,
1200                                entry: entry_opt.map(|e| {
1201                                    agcodex_protocol::message_history::HistoryEntry {
1202                                        session_id: e.session_id,
1203                                        ts: e.ts,
1204                                        text: e.text,
1205                                    }
1206                                }),
1207                            },
1208                        ),
1209                    };
1210
1211                    if let Err(e) = tx_event.send(event).await {
1212                        warn!("failed to send GetHistoryEntryResponse event: {e}");
1213                    }
1214                });
1215            }
1216            Op::ListMcpTools => {
1217                let tx_event = sess.tx_event.clone();
1218                let sub_id = sub.id.clone();
1219
1220                // This is a cheap lookup from the connection manager's cache.
1221                let tools = sess.mcp_connection_manager.list_all_tools();
1222                let event = Event {
1223                    id: sub_id,
1224                    msg: EventMsg::McpListToolsResponse(
1225                        crate::protocol::McpListToolsResponseEvent { tools },
1226                    ),
1227                };
1228                if let Err(e) = tx_event.send(event).await {
1229                    warn!("failed to send McpListToolsResponse event: {e}");
1230                }
1231            }
1232            Op::Compact => {
1233                // Create a summarization request as user input
1234                const SUMMARIZATION_PROMPT: &str = include_str!("prompt_for_compact_command.md");
1235
1236                // Attempt to inject input into current task
1237                if let Err(items) = sess.inject_input(vec![InputItem::Text {
1238                    text: "Start Summarization".to_string(),
1239                }]) {
1240                    let task = AgentTask::compact(
1241                        sess.clone(),
1242                        Arc::clone(&turn_context),
1243                        sub.id,
1244                        items,
1245                        SUMMARIZATION_PROMPT.to_string(),
1246                    );
1247                    sess.set_task(task);
1248                }
1249            }
1250            Op::Shutdown => {
1251                info!("Shutting down Codex instance");
1252
1253                // Gracefully flush and shutdown rollout recorder on session end so tests
1254                // that inspect the rollout file do not race with the background writer.
1255                let recorder_opt = sess.rollout.lock_unchecked().take();
1256                if let Some(rec) = recorder_opt
1257                    && let Err(e) = rec.shutdown().await
1258                {
1259                    warn!("failed to shutdown rollout recorder: {e}");
1260                    let event = Event {
1261                        id: sub.id.clone(),
1262                        msg: EventMsg::Error(ErrorEvent {
1263                            message: "Failed to shutdown rollout recorder".to_string(),
1264                        }),
1265                    };
1266                    if let Err(e) = sess.tx_event.send(event).await {
1267                        warn!("failed to send error message: {e:?}");
1268                    }
1269                }
1270
1271                let event = Event {
1272                    id: sub.id.clone(),
1273                    msg: EventMsg::ShutdownComplete,
1274                };
1275                if let Err(e) = sess.tx_event.send(event).await {
1276                    warn!("failed to send Shutdown event: {e}");
1277                }
1278                break;
1279            }
1280            _ => {
1281                // Ignore unknown ops; enum is non_exhaustive to allow extensions.
1282            }
1283        }
1284    }
1285    debug!("Agent loop exited");
1286}
1287
1288/// Takes a user message as input and runs a loop where, at each turn, the model
1289/// replies with either:
1290///
1291/// - requested function calls
1292/// - an assistant message
1293///
1294/// While it is possible for the model to return multiple of these items in a
1295/// single turn, in practice, we generally one item per turn:
1296///
1297/// - If the model requests a function call, we execute it and send the output
1298///   back to the model in the next turn.
1299/// - If the model sends only an assistant message, we record it in the
1300///   conversation history and consider the task complete.
1301async fn run_task(
1302    sess: Arc<Session>,
1303    turn_context: &TurnContext,
1304    sub_id: String,
1305    input: Vec<InputItem>,
1306) {
1307    if input.is_empty() {
1308        return;
1309    }
1310    let event = Event {
1311        id: sub_id.clone(),
1312        msg: EventMsg::TaskStarted,
1313    };
1314    if sess.tx_event.send(event).await.is_err() {
1315        return;
1316    }
1317
1318    let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
1319    sess.record_conversation_items(&[initial_input_for_turn.clone().into()])
1320        .await;
1321
1322    let mut last_agent_message: Option<String> = None;
1323    // Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
1324    // many turns, from the perspective of the user, it is a single turn.
1325    let mut turn_diff_tracker = TurnDiffTracker::new();
1326
1327    loop {
1328        // Note that pending_input would be something like a message the user
1329        // submitted through the UI while the model was running. Though the UI
1330        // may support this, the model might not.
1331        let pending_input = sess
1332            .get_pending_input()
1333            .into_iter()
1334            .map(ResponseItem::from)
1335            .collect::<Vec<ResponseItem>>();
1336        sess.record_conversation_items(&pending_input).await;
1337
1338        // Construct the input that we will send to the model. When using the
1339        // Chat completions API (or ZDR clients), the model needs the full
1340        // conversation history on each turn. The rollout file, however, should
1341        // only record the new items that originated in this turn so that it
1342        // represents an append-only log without duplicates.
1343        let turn_input: Vec<ResponseItem> = sess.turn_input_with_history(pending_input);
1344
1345        let turn_input_messages: Vec<String> = turn_input
1346            .iter()
1347            .filter_map(|item| match item {
1348                ResponseItem::Message { content, .. } => Some(content),
1349                _ => None,
1350            })
1351            .flat_map(|content| {
1352                content.iter().filter_map(|item| match item {
1353                    ContentItem::OutputText { text } => Some(text.clone()),
1354                    _ => None,
1355                })
1356            })
1357            .collect();
1358        match run_turn(
1359            &sess,
1360            turn_context,
1361            &mut turn_diff_tracker,
1362            sub_id.clone(),
1363            turn_input,
1364        )
1365        .await
1366        {
1367            Ok(turn_output) => {
1368                let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
1369                let mut responses = Vec::<ResponseInputItem>::new();
1370                for processed_response_item in turn_output {
1371                    let ProcessedResponseItem { item, response } = processed_response_item;
1372                    match (&item, &response) {
1373                        (ResponseItem::Message { role, .. }, None) if role == "assistant" => {
1374                            // If the model returned a message, we need to record it.
1375                            items_to_record_in_conversation_history.push(item);
1376                        }
1377                        (
1378                            ResponseItem::LocalShellCall { .. },
1379                            Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
1380                        ) => {
1381                            items_to_record_in_conversation_history.push(item);
1382                            items_to_record_in_conversation_history.push(
1383                                ResponseItem::FunctionCallOutput {
1384                                    call_id: call_id.clone(),
1385                                    output: output.clone(),
1386                                },
1387                            );
1388                        }
1389                        (
1390                            ResponseItem::FunctionCall { .. },
1391                            Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
1392                        ) => {
1393                            items_to_record_in_conversation_history.push(item);
1394                            items_to_record_in_conversation_history.push(
1395                                ResponseItem::FunctionCallOutput {
1396                                    call_id: call_id.clone(),
1397                                    output: output.clone(),
1398                                },
1399                            );
1400                        }
1401                        (
1402                            ResponseItem::FunctionCall { .. },
1403                            Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
1404                        ) => {
1405                            items_to_record_in_conversation_history.push(item);
1406                            let (content, success): (String, Option<bool>) = match result {
1407                                Ok(CallToolResult {
1408                                    content,
1409                                    is_error,
1410                                    structured_content: _,
1411                                }) => match serde_json::to_string(content) {
1412                                    Ok(content) => (content, *is_error),
1413                                    Err(e) => {
1414                                        warn!("Failed to serialize MCP tool call output: {e}");
1415                                        (e.to_string(), Some(true))
1416                                    }
1417                                },
1418                                Err(e) => (e.clone(), Some(true)),
1419                            };
1420                            items_to_record_in_conversation_history.push(
1421                                ResponseItem::FunctionCallOutput {
1422                                    call_id: call_id.clone(),
1423                                    output: FunctionCallOutputPayload { content, success },
1424                                },
1425                            );
1426                        }
1427                        (
1428                            ResponseItem::Reasoning {
1429                                id,
1430                                summary,
1431                                content,
1432                                encrypted_content,
1433                            },
1434                            None,
1435                        ) => {
1436                            items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
1437                                id: id.clone(),
1438                                summary: summary.clone(),
1439                                content: content.clone(),
1440                                encrypted_content: encrypted_content.clone(),
1441                            });
1442                        }
1443                        _ => {
1444                            warn!("Unexpected response item: {item:?} with response: {response:?}");
1445                        }
1446                    };
1447                    if let Some(response) = response {
1448                        responses.push(response);
1449                    }
1450                }
1451
1452                // Only attempt to take the lock if there is something to record.
1453                if !items_to_record_in_conversation_history.is_empty() {
1454                    sess.record_conversation_items(&items_to_record_in_conversation_history)
1455                        .await;
1456                }
1457
1458                if responses.is_empty() {
1459                    debug!("Turn completed");
1460                    last_agent_message = get_last_assistant_message_from_turn(
1461                        &items_to_record_in_conversation_history,
1462                    );
1463                    sess.maybe_notify(UserNotification::AgentTurnComplete {
1464                        turn_id: sub_id.clone(),
1465                        input_messages: turn_input_messages,
1466                        last_assistant_message: last_agent_message.clone(),
1467                    });
1468                    break;
1469                }
1470            }
1471            Err(e) => {
1472                info!("Turn error: {e:#}");
1473                let event = Event {
1474                    id: sub_id.clone(),
1475                    msg: EventMsg::Error(ErrorEvent {
1476                        message: e.to_string(),
1477                    }),
1478                };
1479                sess.tx_event.send(event).await.ok();
1480                // let the user continue the conversation
1481                break;
1482            }
1483        }
1484    }
1485    sess.remove_task(&sub_id);
1486    let event = Event {
1487        id: sub_id,
1488        msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }),
1489    };
1490    sess.tx_event.send(event).await.ok();
1491}
1492
1493async fn run_turn(
1494    sess: &Session,
1495    turn_context: &TurnContext,
1496    turn_diff_tracker: &mut TurnDiffTracker,
1497    sub_id: String,
1498    input: Vec<ResponseItem>,
1499) -> CodexResult<Vec<ProcessedResponseItem>> {
1500    let tools = get_openai_tools(
1501        &turn_context.tools_config,
1502        Some(sess.mcp_connection_manager.list_all_tools()),
1503    );
1504
1505    let prompt = Prompt {
1506        input,
1507        store: !turn_context.disable_response_storage,
1508        tools,
1509        base_instructions_override: turn_context.base_instructions.clone(),
1510    };
1511
1512    let mut retries = 0;
1513    loop {
1514        match try_run_turn(sess, turn_context, turn_diff_tracker, &sub_id, &prompt).await {
1515            Ok(output) => return Ok(output),
1516            Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
1517            Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
1518            Err(e @ (CodexErr::UsageLimitReached(_) | CodexErr::UsageNotIncluded)) => {
1519                return Err(e);
1520            }
1521            Err(e) => {
1522                // Use the configured provider-specific stream retry budget.
1523                let max_retries = turn_context.client.get_provider().stream_max_retries();
1524                if retries < max_retries {
1525                    retries += 1;
1526                    let delay = match e {
1527                        CodexErr::Stream(_, Some(delay)) => delay,
1528                        _ => backoff(retries),
1529                    };
1530                    warn!(
1531                        "stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
1532                    );
1533
1534                    // Surface retry information to any UI/front‑end so the
1535                    // user understands what is happening instead of staring
1536                    // at a seemingly frozen screen.
1537                    sess.notify_background_event(
1538                        &sub_id,
1539                        format!(
1540                            "stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…"
1541                        ),
1542                    )
1543                    .await;
1544
1545                    tokio::time::sleep(delay).await;
1546                } else {
1547                    return Err(e);
1548                }
1549            }
1550        }
1551    }
1552}
1553
1554/// When the model is prompted, it returns a stream of events. Some of these
1555/// events map to a `ResponseItem`. A `ResponseItem` may need to be
1556/// "handled" such that it produces a `ResponseInputItem` that needs to be
1557/// sent back to the model on the next turn.
1558#[derive(Debug)]
1559struct ProcessedResponseItem {
1560    item: ResponseItem,
1561    response: Option<ResponseInputItem>,
1562}
1563
1564async fn try_run_turn(
1565    sess: &Session,
1566    turn_context: &TurnContext,
1567    turn_diff_tracker: &mut TurnDiffTracker,
1568    sub_id: &str,
1569    prompt: &Prompt,
1570) -> CodexResult<Vec<ProcessedResponseItem>> {
1571    // call_ids that are part of this response.
1572    let completed_call_ids = prompt
1573        .input
1574        .iter()
1575        .filter_map(|ri| match ri {
1576            ResponseItem::FunctionCallOutput { call_id, .. } => Some(call_id),
1577            ResponseItem::LocalShellCall {
1578                call_id: Some(call_id),
1579                ..
1580            } => Some(call_id),
1581            _ => None,
1582        })
1583        .collect::<Vec<_>>();
1584
1585    // call_ids that were pending but are not part of this response.
1586    // This usually happens because the user interrupted the model before we responded to one of its tool calls
1587    // and then the user sent a follow-up message.
1588    let missing_calls = {
1589        prompt
1590            .input
1591            .iter()
1592            .filter_map(|ri| match ri {
1593                ResponseItem::FunctionCall { call_id, .. } => Some(call_id),
1594                ResponseItem::LocalShellCall {
1595                    call_id: Some(call_id),
1596                    ..
1597                } => Some(call_id),
1598                _ => None,
1599            })
1600            .filter_map(|call_id| {
1601                if completed_call_ids.contains(&call_id) {
1602                    None
1603                } else {
1604                    Some(call_id.clone())
1605                }
1606            })
1607            .map(|call_id| ResponseItem::FunctionCallOutput {
1608                call_id: call_id.clone(),
1609                output: FunctionCallOutputPayload {
1610                    content: "aborted".to_string(),
1611                    success: Some(false),
1612                },
1613            })
1614            .collect::<Vec<_>>()
1615    };
1616    let prompt: Cow<Prompt> = if missing_calls.is_empty() {
1617        Cow::Borrowed(prompt)
1618    } else {
1619        // Add the synthetic aborted missing calls to the beginning of the input to ensure all call ids have responses.
1620        let input = [missing_calls, prompt.input.clone()].concat();
1621        Cow::Owned(Prompt {
1622            input,
1623            ..prompt.clone()
1624        })
1625    };
1626
1627    let mut stream = turn_context.client.clone().stream(&prompt).await?;
1628
1629    let mut output = Vec::new();
1630    loop {
1631        // Poll the next item from the model stream. We must inspect *both* Ok and Err
1632        // cases so that transient stream failures (e.g., dropped SSE connection before
1633        // `response.completed`) bubble up and trigger the caller's retry logic.
1634        let event = stream.next().await;
1635        let Some(event) = event else {
1636            // Channel closed without yielding a final Completed event or explicit error.
1637            // Treat as a disconnected stream so the caller can retry.
1638            return Err(CodexErr::Stream(
1639                "stream closed before response.completed".into(),
1640                None,
1641            ));
1642        };
1643
1644        let event = match event {
1645            Ok(ev) => ev,
1646            Err(e) => {
1647                // Propagate the underlying stream error to the caller (run_turn), which
1648                // will apply the configured `stream_max_retries` policy.
1649                return Err(e);
1650            }
1651        };
1652
1653        match event {
1654            ResponseEvent::Created => {}
1655            ResponseEvent::OutputItemDone(item) => {
1656                let response = handle_response_item(
1657                    sess,
1658                    turn_context,
1659                    turn_diff_tracker,
1660                    sub_id,
1661                    item.clone(),
1662                )
1663                .await?;
1664                output.push(ProcessedResponseItem { item, response });
1665            }
1666            ResponseEvent::Completed {
1667                response_id: _,
1668                token_usage,
1669            } => {
1670                if let Some(token_usage) = token_usage {
1671                    sess.tx_event
1672                        .send(Event {
1673                            id: sub_id.to_string(),
1674                            msg: EventMsg::TokenCount(token_usage),
1675                        })
1676                        .await
1677                        .ok();
1678                }
1679
1680                let unified_diff = turn_diff_tracker.get_unified_diff();
1681                if let Ok(Some(unified_diff)) = unified_diff {
1682                    let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
1683                    let event = Event {
1684                        id: sub_id.to_string(),
1685                        msg,
1686                    };
1687                    let _ = sess.tx_event.send(event).await;
1688                }
1689
1690                return Ok(output);
1691            }
1692            ResponseEvent::OutputTextDelta(delta) => {
1693                {
1694                    let mut st = sess.state.lock_unchecked();
1695                    st.history.append_assistant_text(&delta);
1696                }
1697
1698                let event = Event {
1699                    id: sub_id.to_string(),
1700                    msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
1701                };
1702                sess.tx_event.send(event).await.ok();
1703            }
1704            ResponseEvent::ReasoningSummaryDelta(delta) => {
1705                let event = Event {
1706                    id: sub_id.to_string(),
1707                    msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
1708                };
1709                sess.tx_event.send(event).await.ok();
1710            }
1711            ResponseEvent::ReasoningSummaryPartAdded => {
1712                let event = Event {
1713                    id: sub_id.to_string(),
1714                    msg: EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {}),
1715                };
1716                sess.tx_event.send(event).await.ok();
1717            }
1718            ResponseEvent::ReasoningContentDelta(delta) => {
1719                if sess.show_raw_agent_reasoning {
1720                    let event = Event {
1721                        id: sub_id.to_string(),
1722                        msg: EventMsg::AgentReasoningRawContentDelta(
1723                            AgentReasoningRawContentDeltaEvent { delta },
1724                        ),
1725                    };
1726                    sess.tx_event.send(event).await.ok();
1727                }
1728            }
1729        }
1730    }
1731}
1732
1733async fn run_compact_task(
1734    sess: Arc<Session>,
1735    turn_context: &TurnContext,
1736    sub_id: String,
1737    input: Vec<InputItem>,
1738    compact_instructions: String,
1739) {
1740    let start_event = Event {
1741        id: sub_id.clone(),
1742        msg: EventMsg::TaskStarted,
1743    };
1744    if sess.tx_event.send(start_event).await.is_err() {
1745        return;
1746    }
1747
1748    let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
1749    let turn_input: Vec<ResponseItem> =
1750        sess.turn_input_with_history(vec![initial_input_for_turn.clone().into()]);
1751
1752    let prompt = Prompt {
1753        input: turn_input,
1754        store: !turn_context.disable_response_storage,
1755        tools: Vec::new(),
1756        base_instructions_override: Some(compact_instructions.clone()),
1757    };
1758
1759    let max_retries = turn_context.client.get_provider().stream_max_retries();
1760    let mut retries = 0;
1761
1762    loop {
1763        let attempt_result = drain_to_completed(&sess, turn_context, &sub_id, &prompt).await;
1764
1765        match attempt_result {
1766            Ok(()) => break,
1767            Err(CodexErr::Interrupted) => return,
1768            Err(e) => {
1769                if retries < max_retries {
1770                    retries += 1;
1771                    let delay = backoff(retries);
1772                    sess.notify_background_event(
1773                        &sub_id,
1774                        format!(
1775                            "stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…"
1776                        ),
1777                    )
1778                    .await;
1779                    tokio::time::sleep(delay).await;
1780                    continue;
1781                } else {
1782                    let event = Event {
1783                        id: sub_id.clone(),
1784                        msg: EventMsg::Error(ErrorEvent {
1785                            message: e.to_string(),
1786                        }),
1787                    };
1788                    sess.send_event(event).await;
1789                    return;
1790                }
1791            }
1792        }
1793    }
1794
1795    sess.remove_task(&sub_id);
1796    let event = Event {
1797        id: sub_id.clone(),
1798        msg: EventMsg::AgentMessage(AgentMessageEvent {
1799            message: "Compact task completed".to_string(),
1800        }),
1801    };
1802    sess.send_event(event).await;
1803    let event = Event {
1804        id: sub_id.clone(),
1805        msg: EventMsg::TaskComplete(TaskCompleteEvent {
1806            last_agent_message: None,
1807        }),
1808    };
1809    sess.send_event(event).await;
1810
1811    let mut state = sess.state.lock_unchecked();
1812    state.history.keep_last_messages(1);
1813}
1814
1815async fn handle_response_item(
1816    sess: &Session,
1817    turn_context: &TurnContext,
1818    turn_diff_tracker: &mut TurnDiffTracker,
1819    sub_id: &str,
1820    item: ResponseItem,
1821) -> CodexResult<Option<ResponseInputItem>> {
1822    debug!(?item, "Output item");
1823    let output = match item {
1824        ResponseItem::Message { content, .. } => {
1825            for item in content {
1826                if let ContentItem::OutputText { text } = item {
1827                    let event = Event {
1828                        id: sub_id.to_string(),
1829                        msg: EventMsg::AgentMessage(AgentMessageEvent { message: text }),
1830                    };
1831                    sess.tx_event.send(event).await.ok();
1832                }
1833            }
1834            None
1835        }
1836        ResponseItem::Reasoning {
1837            id: _,
1838            summary,
1839            content,
1840            encrypted_content: _,
1841        } => {
1842            for item in summary {
1843                let text = match item {
1844                    ReasoningItemReasoningSummary::SummaryText { text } => text,
1845                };
1846                let event = Event {
1847                    id: sub_id.to_string(),
1848                    msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }),
1849                };
1850                sess.tx_event.send(event).await.ok();
1851            }
1852            if sess.show_raw_agent_reasoning
1853                && let Some(content) = content
1854            {
1855                for item in content {
1856                    let text = match item {
1857                        ReasoningItemContent::ReasoningText { text } => text,
1858                        ReasoningItemContent::Text { text } => text,
1859                    };
1860                    let event = Event {
1861                        id: sub_id.to_string(),
1862                        msg: EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
1863                            text,
1864                        }),
1865                    };
1866                    sess.tx_event.send(event).await.ok();
1867                }
1868            }
1869            None
1870        }
1871        ResponseItem::FunctionCall {
1872            name,
1873            arguments,
1874            call_id,
1875            ..
1876        } => {
1877            info!("FunctionCall: {arguments}");
1878            Some(
1879                handle_function_call(
1880                    sess,
1881                    turn_context,
1882                    turn_diff_tracker,
1883                    sub_id.to_string(),
1884                    name,
1885                    arguments,
1886                    call_id,
1887                )
1888                .await,
1889            )
1890        }
1891        ResponseItem::LocalShellCall {
1892            id,
1893            call_id,
1894            status: _,
1895            action,
1896        } => {
1897            let LocalShellAction::Exec(action) = action;
1898            tracing::info!("LocalShellCall: {action:?}");
1899            let params = ShellToolCallParams {
1900                command: action.command,
1901                workdir: action.working_directory,
1902                timeout_ms: action.timeout_ms,
1903                with_escalated_permissions: None,
1904                justification: None,
1905            };
1906            let effective_call_id = match (call_id, id) {
1907                (Some(call_id), _) => call_id,
1908                (None, Some(id)) => id,
1909                (None, None) => {
1910                    error!("LocalShellCall without call_id or id");
1911                    return Ok(Some(ResponseInputItem::FunctionCallOutput {
1912                        call_id: "".to_string(),
1913                        output: FunctionCallOutputPayload {
1914                            content: "LocalShellCall without call_id or id".to_string(),
1915                            success: None,
1916                        },
1917                    }));
1918                }
1919            };
1920
1921            let exec_params = to_exec_params(params, turn_context);
1922            Some(
1923                handle_container_exec_with_params(
1924                    exec_params,
1925                    sess,
1926                    turn_context,
1927                    turn_diff_tracker,
1928                    sub_id.to_string(),
1929                    effective_call_id,
1930                )
1931                .await,
1932            )
1933        }
1934        ResponseItem::FunctionCallOutput { .. } => {
1935            debug!("unexpected FunctionCallOutput from stream");
1936            None
1937        }
1938        ResponseItem::Other => None,
1939    };
1940    Ok(output)
1941}
1942
1943async fn handle_function_call(
1944    sess: &Session,
1945    turn_context: &TurnContext,
1946    turn_diff_tracker: &mut TurnDiffTracker,
1947    sub_id: String,
1948    name: String,
1949    arguments: String,
1950    call_id: String,
1951) -> ResponseInputItem {
1952    match name.as_str() {
1953        "container.exec" | "shell" => {
1954            let params = match parse_container_exec_arguments(arguments, turn_context, &call_id) {
1955                Ok(params) => params,
1956                Err(output) => {
1957                    return *output;
1958                }
1959            };
1960            handle_container_exec_with_params(
1961                params,
1962                sess,
1963                turn_context,
1964                turn_diff_tracker,
1965                sub_id,
1966                call_id,
1967            )
1968            .await
1969        }
1970        "apply_patch" => {
1971            let args = match serde_json::from_str::<ApplyPatchToolArgs>(&arguments) {
1972                Ok(a) => a,
1973                Err(e) => {
1974                    return ResponseInputItem::FunctionCallOutput {
1975                        call_id,
1976                        output: FunctionCallOutputPayload {
1977                            content: format!("failed to parse function arguments: {e}"),
1978                            success: None,
1979                        },
1980                    };
1981                }
1982            };
1983            let exec_params = ExecParams {
1984                command: vec!["apply_patch".to_string(), args.input.clone()],
1985                cwd: turn_context.cwd.clone(),
1986                timeout_ms: None,
1987                env: HashMap::new(),
1988                with_escalated_permissions: None,
1989                justification: None,
1990            };
1991            handle_container_exec_with_params(
1992                exec_params,
1993                sess,
1994                turn_context,
1995                turn_diff_tracker,
1996                sub_id,
1997                call_id,
1998            )
1999            .await
2000        }
2001        "update_plan" => handle_update_plan(sess, arguments, sub_id, call_id).await,
2002        _ => {
2003            match sess.mcp_connection_manager.parse_tool_name(&name) {
2004                Some((server, tool_name)) => {
2005                    // TODO(mbolin): Determine appropriate timeout for tool call.
2006                    let timeout = None;
2007                    handle_mcp_tool_call(
2008                        sess, &sub_id, call_id, server, tool_name, arguments, timeout,
2009                    )
2010                    .await
2011                }
2012                None => {
2013                    // Unknown function: reply with structured failure so the model can adapt.
2014                    ResponseInputItem::FunctionCallOutput {
2015                        call_id,
2016                        output: FunctionCallOutputPayload {
2017                            content: format!("unsupported call: {name}"),
2018                            success: None,
2019                        },
2020                    }
2021                }
2022            }
2023        }
2024    }
2025}
2026
2027fn to_exec_params(params: ShellToolCallParams, turn_context: &TurnContext) -> ExecParams {
2028    ExecParams {
2029        command: params.command,
2030        cwd: turn_context.resolve_path(params.workdir.clone()),
2031        timeout_ms: params.timeout_ms,
2032        env: create_env(&turn_context.shell_environment_policy),
2033        with_escalated_permissions: params.with_escalated_permissions,
2034        justification: params.justification,
2035    }
2036}
2037
2038fn parse_container_exec_arguments(
2039    arguments: String,
2040    turn_context: &TurnContext,
2041    call_id: &str,
2042) -> std::result::Result<ExecParams, Box<ResponseInputItem>> {
2043    // parse command
2044    match serde_json::from_str::<ShellToolCallParams>(&arguments) {
2045        Ok(shell_tool_call_params) => Ok(to_exec_params(shell_tool_call_params, turn_context)),
2046        Err(e) => {
2047            // allow model to re-sample
2048            let output = ResponseInputItem::FunctionCallOutput {
2049                call_id: call_id.to_string(),
2050                output: FunctionCallOutputPayload {
2051                    content: format!("failed to parse function arguments: {e}"),
2052                    success: None,
2053                },
2054            };
2055            Err(Box::new(output))
2056        }
2057    }
2058}
2059
2060pub struct ExecInvokeArgs<'a> {
2061    pub params: ExecParams,
2062    pub sandbox_type: SandboxType,
2063    pub sandbox_policy: &'a SandboxPolicy,
2064    pub codex_linux_sandbox_exe: &'a Option<PathBuf>,
2065    pub stdout_stream: Option<StdoutStream>,
2066}
2067
2068fn maybe_run_with_user_profile(
2069    params: ExecParams,
2070    sess: &Session,
2071    turn_context: &TurnContext,
2072) -> ExecParams {
2073    if turn_context.shell_environment_policy.use_profile {
2074        let command = sess
2075            .user_shell
2076            .format_default_shell_invocation(params.command.clone());
2077        if let Some(command) = command {
2078            return ExecParams { command, ..params };
2079        }
2080    }
2081    params
2082}
2083
2084async fn handle_container_exec_with_params(
2085    params: ExecParams,
2086    sess: &Session,
2087    turn_context: &TurnContext,
2088    turn_diff_tracker: &mut TurnDiffTracker,
2089    sub_id: String,
2090    call_id: String,
2091) -> ResponseInputItem {
2092    // check if this was a patch, and apply it if so
2093    let apply_patch_exec = match maybe_parse_apply_patch_verified(&params.command, &params.cwd) {
2094        MaybeApplyPatchVerified::Body(changes) => {
2095            match apply_patch::apply_patch(sess, turn_context, &sub_id, &call_id, changes).await {
2096                InternalApplyPatchInvocation::Output(item) => return item,
2097                InternalApplyPatchInvocation::DelegateToExec(apply_patch_exec) => {
2098                    Some(apply_patch_exec)
2099                }
2100            }
2101        }
2102        MaybeApplyPatchVerified::CorrectnessError(parse_error) => {
2103            // It looks like an invocation of `apply_patch`, but we
2104            // could not resolve it into a patch that would apply
2105            // cleanly. Return to model for resample.
2106            return ResponseInputItem::FunctionCallOutput {
2107                call_id,
2108                output: FunctionCallOutputPayload {
2109                    content: format!("error: {parse_error:#}"),
2110                    success: None,
2111                },
2112            };
2113        }
2114        MaybeApplyPatchVerified::ShellParseError(error) => {
2115            trace!("Failed to parse shell command, {error:?}");
2116            None
2117        }
2118        MaybeApplyPatchVerified::NotApplyPatch => None,
2119    };
2120
2121    let (params, safety, command_for_display) = match &apply_patch_exec {
2122        Some(ApplyPatchExec {
2123            action: ApplyPatchAction { patch, cwd, .. },
2124            user_explicitly_approved_this_action,
2125        }) => {
2126            let path_to_codex = std::env::current_exe()
2127                .ok()
2128                .map(|p| p.to_string_lossy().to_string());
2129            let Some(path_to_codex) = path_to_codex else {
2130                return ResponseInputItem::FunctionCallOutput {
2131                    call_id,
2132                    output: FunctionCallOutputPayload {
2133                        content: "failed to determine path to codex executable".to_string(),
2134                        success: None,
2135                    },
2136                };
2137            };
2138
2139            let params = ExecParams {
2140                command: vec![
2141                    path_to_codex,
2142                    CODEX_APPLY_PATCH_ARG1.to_string(),
2143                    patch.clone(),
2144                ],
2145                cwd: cwd.clone(),
2146                timeout_ms: params.timeout_ms,
2147                env: HashMap::new(),
2148                with_escalated_permissions: params.with_escalated_permissions,
2149                justification: params.justification.clone(),
2150            };
2151            let safety = if *user_explicitly_approved_this_action {
2152                SafetyCheck::AutoApprove {
2153                    sandbox_type: SandboxType::None,
2154                }
2155            } else {
2156                assess_safety_for_untrusted_command(
2157                    turn_context.approval_policy,
2158                    &turn_context.sandbox_policy,
2159                    params.with_escalated_permissions.unwrap_or(false),
2160                )
2161            };
2162            (
2163                params,
2164                safety,
2165                vec!["apply_patch".to_string(), patch.clone()],
2166            )
2167        }
2168        None => {
2169            let safety = {
2170                let state = sess.state.lock_unchecked();
2171                assess_command_safety(
2172                    &params.command,
2173                    turn_context.approval_policy,
2174                    &turn_context.sandbox_policy,
2175                    &state.approved_commands,
2176                    params.with_escalated_permissions.unwrap_or(false),
2177                )
2178            };
2179            let command_for_display = params.command.clone();
2180            (params, safety, command_for_display)
2181        }
2182    };
2183
2184    let sandbox_type = match safety {
2185        SafetyCheck::AutoApprove { sandbox_type } => sandbox_type,
2186        SafetyCheck::AskUser => {
2187            let rx_approve = sess
2188                .request_command_approval(
2189                    sub_id.clone(),
2190                    call_id.clone(),
2191                    params.command.clone(),
2192                    params.cwd.clone(),
2193                    params.justification.clone(),
2194                )
2195                .await;
2196            match rx_approve.await.unwrap_or_default() {
2197                ReviewDecision::Approved => (),
2198                ReviewDecision::ApprovedForSession => {
2199                    sess.add_approved_command(params.command.clone());
2200                }
2201                ReviewDecision::Denied | ReviewDecision::Abort => {
2202                    return ResponseInputItem::FunctionCallOutput {
2203                        call_id,
2204                        output: FunctionCallOutputPayload {
2205                            content: "exec command rejected by user".to_string(),
2206                            success: None,
2207                        },
2208                    };
2209                }
2210            }
2211            // No sandboxing is applied because the user has given
2212            // explicit approval. Often, we end up in this case because
2213            // the command cannot be run in a sandbox, such as
2214            // installing a new dependency that requires network access.
2215            SandboxType::None
2216        }
2217        SafetyCheck::Reject { reason } => {
2218            return ResponseInputItem::FunctionCallOutput {
2219                call_id,
2220                output: FunctionCallOutputPayload {
2221                    content: format!("exec command rejected: {reason}"),
2222                    success: None,
2223                },
2224            };
2225        }
2226    };
2227
2228    let exec_command_context = ExecCommandContext {
2229        sub_id: sub_id.clone(),
2230        call_id: call_id.clone(),
2231        command_for_display: command_for_display.clone(),
2232        cwd: params.cwd.clone(),
2233        apply_patch: apply_patch_exec.map(
2234            |ApplyPatchExec {
2235                 action,
2236                 user_explicitly_approved_this_action,
2237             }| ApplyPatchCommandContext {
2238                user_explicitly_approved_this_action,
2239                changes: convert_apply_patch_to_protocol(&action),
2240            },
2241        ),
2242    };
2243
2244    let params = maybe_run_with_user_profile(params, sess, turn_context);
2245    let output_result = sess
2246        .run_exec_with_events(
2247            turn_diff_tracker,
2248            exec_command_context.clone(),
2249            ExecInvokeArgs {
2250                params: params.clone(),
2251                sandbox_type,
2252                sandbox_policy: &turn_context.sandbox_policy,
2253                codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
2254                stdout_stream: Some(StdoutStream {
2255                    sub_id: sub_id.clone(),
2256                    call_id: call_id.clone(),
2257                    tx_event: sess.tx_event.clone(),
2258                }),
2259            },
2260        )
2261        .await;
2262
2263    match output_result {
2264        Ok(output) => {
2265            let ExecToolCallOutput { exit_code, .. } = &output;
2266
2267            let is_success = *exit_code == 0;
2268            let content = format_exec_output(output);
2269            ResponseInputItem::FunctionCallOutput {
2270                call_id: call_id.clone(),
2271                output: FunctionCallOutputPayload {
2272                    content,
2273                    success: Some(is_success),
2274                },
2275            }
2276        }
2277        Err(CodexErr::Sandbox(error)) => {
2278            handle_sandbox_error(
2279                turn_diff_tracker,
2280                params,
2281                exec_command_context,
2282                error,
2283                sandbox_type,
2284                sess,
2285                turn_context,
2286            )
2287            .await
2288        }
2289        Err(e) => ResponseInputItem::FunctionCallOutput {
2290            call_id: call_id.clone(),
2291            output: FunctionCallOutputPayload {
2292                content: format!("execution error: {e}"),
2293                success: None,
2294            },
2295        },
2296    }
2297}
2298
2299async fn handle_sandbox_error(
2300    turn_diff_tracker: &mut TurnDiffTracker,
2301    params: ExecParams,
2302    exec_command_context: ExecCommandContext,
2303    error: SandboxErr,
2304    sandbox_type: SandboxType,
2305    sess: &Session,
2306    turn_context: &TurnContext,
2307) -> ResponseInputItem {
2308    let call_id = exec_command_context.call_id.clone();
2309    let sub_id = exec_command_context.sub_id.clone();
2310    let cwd = exec_command_context.cwd.clone();
2311
2312    // Early out if either the user never wants to be asked for approval, or
2313    // we're letting the model manage escalation requests. Otherwise, continue
2314    match turn_context.approval_policy {
2315        AskForApproval::Never | AskForApproval::OnRequest => {
2316            return ResponseInputItem::FunctionCallOutput {
2317                call_id,
2318                output: FunctionCallOutputPayload {
2319                    content: format!(
2320                        "failed in sandbox {sandbox_type:?} with execution error: {error}"
2321                    ),
2322                    success: Some(false),
2323                },
2324            };
2325        }
2326        AskForApproval::UnlessTrusted | AskForApproval::OnFailure => (),
2327    }
2328
2329    // similarly, if the command timed out, we can simply return this failure to the model
2330    if matches!(error, SandboxErr::Timeout) {
2331        return ResponseInputItem::FunctionCallOutput {
2332            call_id,
2333            output: FunctionCallOutputPayload {
2334                content: format!(
2335                    "command timed out after {} milliseconds",
2336                    params.timeout_duration().as_millis()
2337                ),
2338                success: Some(false),
2339            },
2340        };
2341    }
2342
2343    // Note that when `error` is `SandboxErr::Denied`, it could be a false
2344    // positive. That is, it may have exited with a non-zero exit code, not
2345    // because the sandbox denied it, but because that is its expected behavior,
2346    // i.e., a grep command that did not match anything. Ideally we would
2347    // include additional metadata on the command to indicate whether non-zero
2348    // exit codes merit a retry.
2349
2350    // For now, we categorically ask the user to retry without sandbox and
2351    // emit the raw error as a background event.
2352    sess.notify_background_event(&sub_id, format!("Execution failed: {error}"))
2353        .await;
2354
2355    let rx_approve = sess
2356        .request_command_approval(
2357            sub_id.clone(),
2358            call_id.clone(),
2359            params.command.clone(),
2360            cwd.clone(),
2361            Some("command failed; retry without sandbox?".to_string()),
2362        )
2363        .await;
2364
2365    match rx_approve.await.unwrap_or_default() {
2366        ReviewDecision::Approved | ReviewDecision::ApprovedForSession => {
2367            // Persist this command as pre‑approved for the
2368            // remainder of the session so future
2369            // executions skip the sandbox directly.
2370            // TODO(ragona): Isn't this a bug? It always saves the command in an | fork?
2371            sess.add_approved_command(params.command.clone());
2372            // Inform UI we are retrying without sandbox.
2373            sess.notify_background_event(&sub_id, "retrying command without sandbox")
2374                .await;
2375
2376            // This is an escalated retry; the policy will not be
2377            // examined and the sandbox has been set to `None`.
2378            let retry_output_result = sess
2379                .run_exec_with_events(
2380                    turn_diff_tracker,
2381                    exec_command_context.clone(),
2382                    ExecInvokeArgs {
2383                        params,
2384                        sandbox_type: SandboxType::None,
2385                        sandbox_policy: &turn_context.sandbox_policy,
2386                        codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
2387                        stdout_stream: Some(StdoutStream {
2388                            sub_id: sub_id.clone(),
2389                            call_id: call_id.clone(),
2390                            tx_event: sess.tx_event.clone(),
2391                        }),
2392                    },
2393                )
2394                .await;
2395
2396            match retry_output_result {
2397                Ok(retry_output) => {
2398                    let ExecToolCallOutput { exit_code, .. } = &retry_output;
2399
2400                    let is_success = *exit_code == 0;
2401                    let content = format_exec_output(retry_output);
2402
2403                    ResponseInputItem::FunctionCallOutput {
2404                        call_id: call_id.clone(),
2405                        output: FunctionCallOutputPayload {
2406                            content,
2407                            success: Some(is_success),
2408                        },
2409                    }
2410                }
2411                Err(e) => ResponseInputItem::FunctionCallOutput {
2412                    call_id: call_id.clone(),
2413                    output: FunctionCallOutputPayload {
2414                        content: format!("retry failed: {e}"),
2415                        success: None,
2416                    },
2417                },
2418            }
2419        }
2420        ReviewDecision::Denied | ReviewDecision::Abort => {
2421            // Fall through to original failure handling.
2422            ResponseInputItem::FunctionCallOutput {
2423                call_id,
2424                output: FunctionCallOutputPayload {
2425                    content: "exec command rejected by user".to_string(),
2426                    success: None,
2427                },
2428            }
2429        }
2430    }
2431}
2432
2433/// Exec output is a pre-serialized JSON payload
2434fn format_exec_output(exec_output: ExecToolCallOutput) -> String {
2435    let ExecToolCallOutput {
2436        exit_code,
2437        stdout,
2438        stderr,
2439        duration,
2440    } = exec_output;
2441
2442    #[derive(Serialize)]
2443    struct ExecMetadata {
2444        exit_code: i32,
2445        duration_seconds: f32,
2446    }
2447
2448    #[derive(Serialize)]
2449    struct ExecOutput<'a> {
2450        output: &'a str,
2451        metadata: ExecMetadata,
2452    }
2453
2454    // round to 1 decimal place
2455    let duration_seconds = ((duration.as_secs_f32()) * 10.0).round() / 10.0;
2456
2457    let is_success = exit_code == 0;
2458    let output = if is_success { stdout } else { stderr };
2459
2460    let mut formatted_output = output.text;
2461    if let Some(truncated_after_lines) = output.truncated_after_lines {
2462        formatted_output.push_str(&format!(
2463            "\n\n[Output truncated after {truncated_after_lines} lines: too many lines or bytes.]",
2464        ));
2465    }
2466
2467    let payload = ExecOutput {
2468        output: &formatted_output,
2469        metadata: ExecMetadata {
2470            exit_code,
2471            duration_seconds,
2472        },
2473    };
2474
2475    #[expect(clippy::expect_used)]
2476    serde_json::to_string(&payload).expect("serialize ExecOutput")
2477}
2478
2479fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> {
2480    responses.iter().rev().find_map(|item| {
2481        if let ResponseItem::Message { role, content, .. } = item {
2482            if role == "assistant" {
2483                content.iter().rev().find_map(|ci| {
2484                    if let ContentItem::OutputText { text } = ci {
2485                        Some(text.clone())
2486                    } else {
2487                        None
2488                    }
2489                })
2490            } else {
2491                None
2492            }
2493        } else {
2494            None
2495        }
2496    })
2497}
2498
2499async fn drain_to_completed(
2500    sess: &Session,
2501    turn_context: &TurnContext,
2502    sub_id: &str,
2503    prompt: &Prompt,
2504) -> CodexResult<()> {
2505    let mut stream = turn_context.client.clone().stream(prompt).await?;
2506    loop {
2507        let maybe_event = stream.next().await;
2508        let Some(event) = maybe_event else {
2509            return Err(CodexErr::Stream(
2510                "stream closed before response.completed".into(),
2511                None,
2512            ));
2513        };
2514        match event {
2515            Ok(ResponseEvent::OutputItemDone(item)) => {
2516                // Record only to in-memory conversation history; avoid state snapshot.
2517                let mut state = sess.state.lock_unchecked();
2518                state.history.record_items(std::slice::from_ref(&item));
2519            }
2520            Ok(ResponseEvent::Completed {
2521                response_id: _,
2522                token_usage,
2523            }) => {
2524                let token_usage = match token_usage {
2525                    Some(usage) => usage,
2526                    None => {
2527                        return Err(CodexErr::Stream(
2528                            "token_usage was None in ResponseEvent::Completed".into(),
2529                            None,
2530                        ));
2531                    }
2532                };
2533                sess.tx_event
2534                    .send(Event {
2535                        id: sub_id.to_string(),
2536                        msg: EventMsg::TokenCount(token_usage),
2537                    })
2538                    .await
2539                    .ok();
2540                return Ok(());
2541            }
2542            Ok(_) => continue,
2543            Err(e) => return Err(e),
2544        }
2545    }
2546}