1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::{Arc, RwLock};
4
5use async_stream::try_stream;
6use futures::{Stream, StreamExt};
7
8use crate::codex_options::CodexOptions;
9use crate::errors::{Error, Result};
10use crate::events::{ThreadError, ThreadEvent, Usage};
11use crate::exec::{CodexExec, CodexExecArgs};
12use crate::items::ThreadItem;
13use crate::output_schema_file::create_output_schema_file;
14use crate::thread_options::ThreadOptions;
15use crate::turn_options::TurnOptions;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum UserInput {
20 Text {
22 text: String,
24 },
25 LocalImage {
27 path: PathBuf,
29 },
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum Input {
35 Text(String),
37 Entries(Vec<UserInput>),
39}
40
41impl From<&str> for Input {
42 fn from(value: &str) -> Self {
43 Self::Text(value.to_string())
44 }
45}
46
47impl From<String> for Input {
48 fn from(value: String) -> Self {
49 Self::Text(value)
50 }
51}
52
53impl From<Vec<UserInput>> for Input {
54 fn from(value: Vec<UserInput>) -> Self {
55 Self::Entries(value)
56 }
57}
58
59#[derive(Debug, Clone, PartialEq)]
61pub struct Turn {
62 pub items: Vec<ThreadItem>,
64 pub final_response: String,
66 pub usage: Option<Usage>,
68}
69
70pub type RunResult = Turn;
72
73pub type ThreadEventStream = Pin<Box<dyn Stream<Item = Result<ThreadEvent>> + Send>>;
75
76pub struct RunStreamedResult {
78 pub events: ThreadEventStream,
80}
81
82#[derive(Debug, Clone)]
84pub struct Thread {
85 exec: CodexExec,
86 options: CodexOptions,
87 thread_options: ThreadOptions,
88 id: Arc<RwLock<Option<String>>>,
89}
90
91impl Thread {
92 pub(crate) fn new(
93 exec: CodexExec,
94 options: CodexOptions,
95 thread_options: ThreadOptions,
96 id: Option<String>,
97 ) -> Self {
98 Self {
99 exec,
100 options,
101 thread_options,
102 id: Arc::new(RwLock::new(id)),
103 }
104 }
105
106 pub fn id(&self) -> Option<String> {
119 self.id.read().unwrap_or_else(|e| e.into_inner()).clone()
120 }
121
122 pub async fn run_streamed(
144 &self,
145 input: impl Into<Input>,
146 turn_options: Option<TurnOptions>,
147 ) -> Result<RunStreamedResult> {
148 let input: Input = input.into();
149 let turn_options = turn_options.unwrap_or_default();
150 let schema_file = create_output_schema_file(turn_options.output_schema.as_ref())?;
151
152 let (prompt, images) = normalize_input(input);
153 let output_schema_file = schema_file
154 .as_ref()
155 .map(|file| file.path().to_string_lossy().into_owned());
156
157 let exec_args = CodexExecArgs {
158 input: prompt,
159 base_url: self.options.base_url.clone(),
160 api_key: self.options.api_key.clone(),
161 thread_id: self.id(),
162 images,
163 model: self.thread_options.model.clone(),
164 sandbox_mode: self.thread_options.sandbox_mode,
165 working_directory: self.thread_options.working_directory.clone(),
166 additional_directories: self
167 .thread_options
168 .additional_directories
169 .clone()
170 .unwrap_or_default(),
171 skip_git_repo_check: self.thread_options.skip_git_repo_check.unwrap_or(false),
172 output_schema_file,
173 model_reasoning_effort: self.thread_options.model_reasoning_effort,
174 network_access_enabled: self.thread_options.network_access_enabled,
175 web_search_mode: self.thread_options.web_search_mode,
176 web_search_enabled: self.thread_options.web_search_enabled,
177 approval_policy: self.thread_options.approval_policy,
178 cancellation_token: turn_options.cancellation_token.clone(),
179 };
180
181 let line_stream = self.exec.run(exec_args).await?;
182 let id_handle = Arc::clone(&self.id);
183
184 let events = try_stream! {
185 let _schema_file = schema_file;
186 let mut line_stream = line_stream;
187
188 while let Some(line_result) = line_stream.next().await {
189 let line = line_result?;
190 let event: ThreadEvent = serde_json::from_str(&line)
191 .map_err(|e| Error::JsonParse(format!("{e}: {line}")))?;
192
193 if let ThreadEvent::ThreadStarted { thread_id } = &event {
194 *id_handle.write().unwrap_or_else(|e| e.into_inner()) = Some(thread_id.clone());
195 }
196
197 yield event;
198 }
199 };
200
201 Ok(RunStreamedResult {
202 events: Box::pin(events),
203 })
204 }
205
206 pub async fn run(
222 &self,
223 input: impl Into<Input>,
224 turn_options: Option<TurnOptions>,
225 ) -> Result<Turn> {
226 let streamed = self.run_streamed(input, turn_options).await?;
227 let mut events = streamed.events;
228
229 let mut items = Vec::new();
230 let mut final_response = String::new();
231 let mut usage = None;
232 let mut turn_failure: Option<ThreadError> = None;
233 let mut stream_error: Option<String> = None;
234
235 while let Some(event_result) = events.next().await {
236 let event = event_result?;
237 match event {
238 ThreadEvent::ItemCompleted { item } => {
239 if let ThreadItem::AgentMessage(agent_message) = &item {
240 final_response = agent_message.text.clone();
241 }
242 items.push(item);
243 }
244 ThreadEvent::TurnCompleted { usage: turn_usage } => {
245 usage = Some(turn_usage);
246 }
247 ThreadEvent::TurnFailed { error } => {
248 turn_failure = Some(error);
249 break;
250 }
251 ThreadEvent::Error { message } => {
252 stream_error = Some(message);
253 break;
254 }
255 _ => {}
256 }
257 }
258
259 if let Some(error) = turn_failure {
260 return Err(Error::ThreadRun(error.message));
261 }
262 if let Some(message) = stream_error {
263 return Err(Error::ThreadRun(message));
264 }
265
266 Ok(Turn {
267 items,
268 final_response,
269 usage,
270 })
271 }
272}
273
274fn normalize_input(input: Input) -> (String, Vec<String>) {
275 match input {
276 Input::Text(text) => (text, Vec::new()),
277 Input::Entries(entries) => {
278 let mut prompt_parts = Vec::new();
279 let mut images = Vec::new();
280
281 for entry in entries {
282 match entry {
283 UserInput::Text { text } => prompt_parts.push(text),
284 UserInput::LocalImage { path } => {
285 images.push(path.to_string_lossy().to_string())
286 }
287 }
288 }
289
290 (prompt_parts.join("\n\n"), images)
291 }
292 }
293}