Skip to main content

codex/
thread.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::{Arc, RwLock};
4
5use async_stream::try_stream;
6use futures::{Stream, StreamExt};
7
8use crate::codex_options::CodexOptions;
9use crate::errors::{Error, Result};
10use crate::events::{ThreadError, ThreadEvent, Usage};
11use crate::exec::{CodexExec, CodexExecArgs};
12use crate::items::ThreadItem;
13use crate::output_schema_file::create_output_schema_file;
14use crate::thread_options::ThreadOptions;
15use crate::turn_options::TurnOptions;
16
17/// Structured user input for multimodal turns.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum UserInput {
20    /// Plain text segment appended to the prompt.
21    Text {
22        /// Text content included in the prompt.
23        text: String,
24    },
25    /// Local image path passed to Codex via `--image`.
26    LocalImage {
27        /// Path to a local image file.
28        path: PathBuf,
29    },
30}
31
32/// Input accepted by [`Thread::run`] / [`Thread::run_streamed`].
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum Input {
35    /// Single text prompt.
36    Text(String),
37    /// Ordered multimodal entries.
38    Entries(Vec<UserInput>),
39}
40
41impl From<&str> for Input {
42    fn from(value: &str) -> Self {
43        Self::Text(value.to_string())
44    }
45}
46
47impl From<String> for Input {
48    fn from(value: String) -> Self {
49        Self::Text(value)
50    }
51}
52
53impl From<Vec<UserInput>> for Input {
54    fn from(value: Vec<UserInput>) -> Self {
55        Self::Entries(value)
56    }
57}
58
59/// Completed turn.
60#[derive(Debug, Clone, PartialEq)]
61pub struct Turn {
62    /// Completed items emitted during the turn.
63    pub items: Vec<ThreadItem>,
64    /// Final assistant response text from the latest `agent_message` item.
65    pub final_response: String,
66    /// Token usage when reported by the CLI.
67    pub usage: Option<Usage>,
68}
69
70/// Alias for [`Turn`] to describe the result of [`Thread::run`].
71pub type RunResult = Turn;
72
73/// Stream of thread events.
74pub type ThreadEventStream = Pin<Box<dyn Stream<Item = Result<ThreadEvent>> + Send>>;
75
76/// Result of [`Thread::run_streamed`].
77pub struct RunStreamedResult {
78    /// Event stream for the current turn.
79    pub events: ThreadEventStream,
80}
81
82/// Represents a thread of conversation with the agent.
83#[derive(Debug, Clone)]
84pub struct Thread {
85    exec: CodexExec,
86    options: CodexOptions,
87    thread_options: ThreadOptions,
88    id: Arc<RwLock<Option<String>>>,
89}
90
91impl Thread {
92    pub(crate) fn new(
93        exec: CodexExec,
94        options: CodexOptions,
95        thread_options: ThreadOptions,
96        id: Option<String>,
97    ) -> Self {
98        Self {
99            exec,
100            options,
101            thread_options,
102            id: Arc::new(RwLock::new(id)),
103        }
104    }
105
106    /// Returns the current thread id, if available.
107    ///
108    /// # Example
109    ///
110    /// ```rust,no_run
111    /// use codex::Codex;
112    ///
113    /// let codex = Codex::new(None)?;
114    /// let thread = codex.start_thread(None);
115    /// let _id = thread.id();
116    /// # Ok::<(), codex::Error>(())
117    /// ```
118    pub fn id(&self) -> Option<String> {
119        self.id.read().unwrap_or_else(|e| e.into_inner()).clone()
120    }
121
122    /// Provides input to the agent and streams events as they are produced.
123    ///
124    /// # Example
125    ///
126    /// ```rust,no_run
127    /// use codex::{Codex, ThreadEvent};
128    /// use futures::StreamExt;
129    ///
130    /// # async fn example() -> codex::Result<()> {
131    /// let codex = Codex::new(None)?;
132    /// let thread = codex.start_thread(None);
133    /// let mut events = thread.run_streamed("Review this code", None).await?.events;
134    ///
135    /// while let Some(event) = events.next().await {
136    ///     if let ThreadEvent::TurnCompleted { usage } = event? {
137    ///         println!("{usage:?}");
138    ///     }
139    /// }
140    /// # Ok(())
141    /// # }
142    /// ```
143    pub async fn run_streamed(
144        &self,
145        input: impl Into<Input>,
146        turn_options: Option<TurnOptions>,
147    ) -> Result<RunStreamedResult> {
148        let input: Input = input.into();
149        let turn_options = turn_options.unwrap_or_default();
150        let schema_file = create_output_schema_file(turn_options.output_schema.as_ref())?;
151
152        let (prompt, images) = normalize_input(input);
153        let output_schema_file = schema_file
154            .as_ref()
155            .map(|file| file.path().to_string_lossy().into_owned());
156
157        let exec_args = CodexExecArgs {
158            input: prompt,
159            base_url: self.options.base_url.clone(),
160            api_key: self.options.api_key.clone(),
161            thread_id: self.id(),
162            images,
163            model: self.thread_options.model.clone(),
164            sandbox_mode: self.thread_options.sandbox_mode,
165            working_directory: self.thread_options.working_directory.clone(),
166            additional_directories: self
167                .thread_options
168                .additional_directories
169                .clone()
170                .unwrap_or_default(),
171            skip_git_repo_check: self.thread_options.skip_git_repo_check.unwrap_or(false),
172            output_schema_file,
173            model_reasoning_effort: self.thread_options.model_reasoning_effort,
174            network_access_enabled: self.thread_options.network_access_enabled,
175            web_search_mode: self.thread_options.web_search_mode,
176            web_search_enabled: self.thread_options.web_search_enabled,
177            approval_policy: self.thread_options.approval_policy,
178            cancellation_token: turn_options.cancellation_token.clone(),
179        };
180
181        let line_stream = self.exec.run(exec_args).await?;
182        let id_handle = Arc::clone(&self.id);
183
184        let events = try_stream! {
185            let _schema_file = schema_file;
186            let mut line_stream = line_stream;
187
188            while let Some(line_result) = line_stream.next().await {
189                let line = line_result?;
190                let event: ThreadEvent = serde_json::from_str(&line)
191                    .map_err(|e| Error::JsonParse(format!("{e}: {line}")))?;
192
193                if let ThreadEvent::ThreadStarted { thread_id } = &event {
194                    *id_handle.write().unwrap_or_else(|e| e.into_inner()) = Some(thread_id.clone());
195                }
196
197                yield event;
198            }
199        };
200
201        Ok(RunStreamedResult {
202            events: Box::pin(events),
203        })
204    }
205
206    /// Provides input to the agent and returns the completed turn.
207    ///
208    /// # Example
209    ///
210    /// ```rust,no_run
211    /// use codex::Codex;
212    ///
213    /// # async fn example() -> codex::Result<()> {
214    /// let codex = Codex::new(None)?;
215    /// let thread = codex.start_thread(None);
216    /// let turn = thread.run("Summarize current repository state", None).await?;
217    /// println!("{}", turn.final_response);
218    /// # Ok(())
219    /// # }
220    /// ```
221    pub async fn run(
222        &self,
223        input: impl Into<Input>,
224        turn_options: Option<TurnOptions>,
225    ) -> Result<Turn> {
226        let streamed = self.run_streamed(input, turn_options).await?;
227        let mut events = streamed.events;
228
229        let mut items = Vec::new();
230        let mut final_response = String::new();
231        let mut usage = None;
232        let mut turn_failure: Option<ThreadError> = None;
233        let mut stream_error: Option<String> = None;
234
235        while let Some(event_result) = events.next().await {
236            let event = event_result?;
237            match event {
238                ThreadEvent::ItemCompleted { item } => {
239                    if let ThreadItem::AgentMessage(agent_message) = &item {
240                        final_response = agent_message.text.clone();
241                    }
242                    items.push(item);
243                }
244                ThreadEvent::TurnCompleted { usage: turn_usage } => {
245                    usage = Some(turn_usage);
246                }
247                ThreadEvent::TurnFailed { error } => {
248                    turn_failure = Some(error);
249                    break;
250                }
251                ThreadEvent::Error { message } => {
252                    stream_error = Some(message);
253                    break;
254                }
255                _ => {}
256            }
257        }
258
259        if let Some(error) = turn_failure {
260            return Err(Error::ThreadRun(error.message));
261        }
262        if let Some(message) = stream_error {
263            return Err(Error::ThreadRun(message));
264        }
265
266        Ok(Turn {
267            items,
268            final_response,
269            usage,
270        })
271    }
272}
273
274fn normalize_input(input: Input) -> (String, Vec<String>) {
275    match input {
276        Input::Text(text) => (text, Vec::new()),
277        Input::Entries(entries) => {
278            let mut prompt_parts = Vec::new();
279            let mut images = Vec::new();
280
281            for entry in entries {
282                match entry {
283                    UserInput::Text { text } => prompt_parts.push(text),
284                    UserInput::LocalImage { path } => {
285                        images.push(path.to_string_lossy().to_string())
286                    }
287                }
288            }
289
290            (prompt_parts.join("\n\n"), images)
291        }
292    }
293}