1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3
4use serde_json::{Map, Value};
5use thiserror::Error;
6use tokio::sync::{Mutex, mpsc};
7use tokio::task::JoinHandle;
8
9use crate::CodexClient;
10use crate::client::StdioConfig;
11use crate::client::WsConfig;
12use crate::error::ClientError;
13use crate::events::{ServerEvent, ServerNotification};
14use crate::protocol::shared::EmptyObject;
15use crate::protocol::{requests, responses};
16use crate::schema::OpenAiSerializable;
17
18const THREAD_LIST_PAGE_LIMIT: u32 = 100;
19const MAX_THREAD_LIST_PAGES: usize = 100;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum ApprovalMode {
23 Never,
24 OnRequest,
25 OnFailure,
26 Untrusted,
27}
28
29impl ApprovalMode {
30 fn as_str(self) -> &'static str {
31 match self {
32 Self::Never => "never",
33 Self::OnRequest => "on-request",
34 Self::OnFailure => "on-failure",
35 Self::Untrusted => "untrusted",
36 }
37 }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum SandboxMode {
42 ReadOnly,
43 WorkspaceWrite,
44 DangerFullAccess,
45}
46
47impl SandboxMode {
48 fn as_str(self) -> &'static str {
49 match self {
50 Self::ReadOnly => "read-only",
51 Self::WorkspaceWrite => "workspace-write",
52 Self::DangerFullAccess => "danger-full-access",
53 }
54 }
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum ModelReasoningEffort {
59 None,
60 Minimal,
61 Low,
62 Medium,
63 High,
64 XHigh,
65}
66
67impl ModelReasoningEffort {
68 fn as_str(self) -> &'static str {
69 match self {
70 Self::None => "none",
71 Self::Minimal => "minimal",
72 Self::Low => "low",
73 Self::Medium => "medium",
74 Self::High => "high",
75 Self::XHigh => "xhigh",
76 }
77 }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum ModelReasoningSummary {
82 None,
83 Auto,
84 Concise,
85 Detailed,
86}
87
88impl ModelReasoningSummary {
89 fn as_str(self) -> &'static str {
90 match self {
91 Self::None => "none",
92 Self::Auto => "auto",
93 Self::Concise => "concise",
94 Self::Detailed => "detailed",
95 }
96 }
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum Personality {
101 None,
102 Friendly,
103 Pragmatic,
104}
105
106impl Personality {
107 fn as_str(self) -> &'static str {
108 match self {
109 Self::None => "none",
110 Self::Friendly => "friendly",
111 Self::Pragmatic => "pragmatic",
112 }
113 }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117pub enum WebSearchMode {
118 Disabled,
119 Cached,
120 Live,
121}
122
123impl WebSearchMode {
124 fn as_str(self) -> &'static str {
125 match self {
126 Self::Disabled => "disabled",
127 Self::Cached => "cached",
128 Self::Live => "live",
129 }
130 }
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum CollaborationModeKind {
135 Plan,
136 Default,
137}
138
139impl CollaborationModeKind {
140 fn as_str(self) -> &'static str {
141 match self {
142 Self::Plan => "plan",
143 Self::Default => "default",
144 }
145 }
146}
147
148#[derive(Debug, Clone, PartialEq, Eq)]
149pub struct CollaborationModeSettings {
150 pub model: String,
151 pub reasoning_effort: Option<ModelReasoningEffort>,
152 pub developer_instructions: Option<String>,
153}
154
155impl CollaborationModeSettings {
156 pub fn new(model: impl Into<String>) -> Self {
157 Self {
158 model: model.into(),
159 reasoning_effort: None,
160 developer_instructions: None,
161 }
162 }
163
164 pub fn with_reasoning_effort(mut self, reasoning_effort: ModelReasoningEffort) -> Self {
165 self.reasoning_effort = Some(reasoning_effort);
166 self
167 }
168
169 pub fn with_developer_instructions(
170 mut self,
171 developer_instructions: impl Into<String>,
172 ) -> Self {
173 self.developer_instructions = Some(developer_instructions.into());
174 self
175 }
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub struct CollaborationMode {
180 pub mode: CollaborationModeKind,
181 pub settings: CollaborationModeSettings,
182}
183
184impl CollaborationMode {
185 pub fn new(mode: CollaborationModeKind, settings: CollaborationModeSettings) -> Self {
186 Self { mode, settings }
187 }
188
189 fn as_value(&self) -> Value {
190 let mut settings = Map::new();
191 settings.insert(
192 "model".to_string(),
193 Value::String(self.settings.model.clone()),
194 );
195 if let Some(reasoning_effort) = self.settings.reasoning_effort {
196 settings.insert(
197 "reasoning_effort".to_string(),
198 Value::String(reasoning_effort.as_str().to_string()),
199 );
200 }
201 if let Some(instructions) = &self.settings.developer_instructions {
202 settings.insert(
203 "developer_instructions".to_string(),
204 Value::String(instructions.clone()),
205 );
206 }
207
208 let mut value = Map::new();
209 value.insert(
210 "mode".to_string(),
211 Value::String(self.mode.as_str().to_string()),
212 );
213 value.insert("settings".to_string(), Value::Object(settings));
214 Value::Object(value)
215 }
216}
217
218#[derive(Debug, Clone, PartialEq)]
219pub struct DynamicToolSpec {
220 pub name: String,
221 pub description: String,
222 pub input_schema: Value,
223}
224
225impl DynamicToolSpec {
226 pub fn new(
227 name: impl Into<String>,
228 description: impl Into<String>,
229 input_schema: Value,
230 ) -> Self {
231 Self {
232 name: name.into(),
233 description: description.into(),
234 input_schema,
235 }
236 }
237
238 fn as_value(&self) -> Value {
239 let mut value = Map::new();
240 value.insert("name".to_string(), Value::String(self.name.clone()));
241 value.insert(
242 "description".to_string(),
243 Value::String(self.description.clone()),
244 );
245 value.insert("inputSchema".to_string(), self.input_schema.clone());
246 Value::Object(value)
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq)]
252pub enum ResumeThread {
253 Latest,
254 ById(String),
255}
256
257impl From<String> for ResumeThread {
258 fn from(value: String) -> Self {
259 Self::ById(value)
260 }
261}
262
263impl From<&str> for ResumeThread {
264 fn from(value: &str) -> Self {
265 Self::ById(value.to_string())
266 }
267}
268
269impl From<&String> for ResumeThread {
270 fn from(value: &String) -> Self {
271 Self::ById(value.clone())
272 }
273}
274
275#[derive(Debug, Clone, Default)]
276pub struct ThreadOptions {
277 pub model: Option<String>,
278 pub model_provider: Option<String>,
279 pub sandbox_mode: Option<SandboxMode>,
280 pub sandbox_policy: Option<Value>,
281 pub working_directory: Option<String>,
282 pub skip_git_repo_check: Option<bool>,
283 pub model_reasoning_effort: Option<ModelReasoningEffort>,
284 pub model_reasoning_summary: Option<ModelReasoningSummary>,
285 pub network_access_enabled: Option<bool>,
286 pub web_search_mode: Option<WebSearchMode>,
287 pub web_search_enabled: Option<bool>,
288 pub approval_policy: Option<ApprovalMode>,
289 pub additional_directories: Option<Vec<String>>,
290 pub personality: Option<Personality>,
291 pub base_instructions: Option<String>,
292 pub developer_instructions: Option<String>,
293 pub ephemeral: Option<bool>,
294 pub collaboration_mode: Option<CollaborationMode>,
295 pub config: Option<Map<String, Value>>,
296 pub dynamic_tools: Option<Vec<DynamicToolSpec>>,
297 pub experimental_raw_events: Option<bool>,
298 pub persist_extended_history: Option<bool>,
299}
300
301impl ThreadOptions {
302 pub fn builder() -> ThreadOptionsBuilder {
303 ThreadOptionsBuilder::new()
304 }
305}
306
307#[derive(Debug, Clone, Default)]
308pub struct ThreadOptionsBuilder {
309 options: ThreadOptions,
310}
311
312impl ThreadOptionsBuilder {
313 pub fn new() -> Self {
314 Self::default()
315 }
316
317 pub fn build(self) -> ThreadOptions {
318 self.options
319 }
320
321 pub fn model(mut self, model: impl Into<String>) -> Self {
322 self.options.model = Some(model.into());
323 self
324 }
325
326 pub fn model_provider(mut self, model_provider: impl Into<String>) -> Self {
327 self.options.model_provider = Some(model_provider.into());
328 self
329 }
330
331 pub fn sandbox_mode(mut self, sandbox_mode: SandboxMode) -> Self {
332 self.options.sandbox_mode = Some(sandbox_mode);
333 self
334 }
335
336 pub fn sandbox_policy(mut self, sandbox_policy: Value) -> Self {
337 self.options.sandbox_policy = Some(sandbox_policy);
338 self
339 }
340
341 pub fn working_directory(mut self, working_directory: impl Into<String>) -> Self {
342 self.options.working_directory = Some(working_directory.into());
343 self
344 }
345
346 pub fn skip_git_repo_check(mut self, enabled: bool) -> Self {
347 self.options.skip_git_repo_check = Some(enabled);
348 self
349 }
350
351 pub fn model_reasoning_effort(mut self, model_reasoning_effort: ModelReasoningEffort) -> Self {
352 self.options.model_reasoning_effort = Some(model_reasoning_effort);
353 self
354 }
355
356 pub fn model_reasoning_summary(
357 mut self,
358 model_reasoning_summary: ModelReasoningSummary,
359 ) -> Self {
360 self.options.model_reasoning_summary = Some(model_reasoning_summary);
361 self
362 }
363
364 pub fn network_access_enabled(mut self, enabled: bool) -> Self {
365 self.options.network_access_enabled = Some(enabled);
366 self
367 }
368
369 pub fn web_search_mode(mut self, web_search_mode: WebSearchMode) -> Self {
370 self.options.web_search_mode = Some(web_search_mode);
371 self
372 }
373
374 pub fn web_search_enabled(mut self, enabled: bool) -> Self {
375 self.options.web_search_enabled = Some(enabled);
376 self
377 }
378
379 pub fn approval_policy(mut self, approval_policy: ApprovalMode) -> Self {
380 self.options.approval_policy = Some(approval_policy);
381 self
382 }
383
384 pub fn additional_directories(mut self, additional_directories: Vec<String>) -> Self {
385 self.options.additional_directories = Some(additional_directories);
386 self
387 }
388
389 pub fn add_directory(mut self, directory: impl Into<String>) -> Self {
390 self.options
391 .additional_directories
392 .get_or_insert_with(Vec::new)
393 .push(directory.into());
394 self
395 }
396
397 pub fn personality(mut self, personality: Personality) -> Self {
398 self.options.personality = Some(personality);
399 self
400 }
401
402 pub fn base_instructions(mut self, base_instructions: impl Into<String>) -> Self {
403 self.options.base_instructions = Some(base_instructions.into());
404 self
405 }
406
407 pub fn developer_instructions(mut self, developer_instructions: impl Into<String>) -> Self {
408 self.options.developer_instructions = Some(developer_instructions.into());
409 self
410 }
411
412 pub fn ephemeral(mut self, ephemeral: bool) -> Self {
413 self.options.ephemeral = Some(ephemeral);
414 self
415 }
416
417 pub fn collaboration_mode(mut self, collaboration_mode: CollaborationMode) -> Self {
418 self.options.collaboration_mode = Some(collaboration_mode);
419 self
420 }
421
422 pub fn config(mut self, config: Map<String, Value>) -> Self {
423 self.options.config = Some(config);
424 self
425 }
426
427 pub fn insert_config(mut self, key: impl Into<String>, value: Value) -> Self {
428 self.options
429 .config
430 .get_or_insert_with(Map::new)
431 .insert(key.into(), value);
432 self
433 }
434
435 pub fn dynamic_tools(mut self, dynamic_tools: Vec<DynamicToolSpec>) -> Self {
436 self.options.dynamic_tools = Some(dynamic_tools);
437 self
438 }
439
440 pub fn experimental_raw_events(mut self, enabled: bool) -> Self {
441 self.options.experimental_raw_events = Some(enabled);
442 self
443 }
444
445 pub fn persist_extended_history(mut self, enabled: bool) -> Self {
446 self.options.persist_extended_history = Some(enabled);
447 self
448 }
449}
450
451#[derive(Debug, Clone, Default)]
452pub struct TurnOptions {
453 pub output_schema: Option<Value>,
454 pub working_directory: Option<String>,
455 pub model: Option<String>,
456 pub model_provider: Option<String>,
457 pub model_reasoning_effort: Option<ModelReasoningEffort>,
458 pub model_reasoning_summary: Option<ModelReasoningSummary>,
459 pub personality: Option<Personality>,
460 pub approval_policy: Option<ApprovalMode>,
461 pub sandbox_policy: Option<Value>,
462 pub collaboration_mode: Option<CollaborationMode>,
463 pub skip_git_repo_check: Option<bool>,
464 pub web_search_mode: Option<WebSearchMode>,
465 pub web_search_enabled: Option<bool>,
466 pub network_access_enabled: Option<bool>,
467 pub additional_directories: Option<Vec<String>>,
468 pub extra: Option<Map<String, Value>>,
469}
470
471impl TurnOptions {
472 pub fn builder() -> TurnOptionsBuilder {
473 TurnOptionsBuilder::new()
474 }
475
476 pub fn with_output_schema(mut self, output_schema: Value) -> Self {
477 self.output_schema = Some(output_schema);
478 self
479 }
480
481 pub fn with_output_schema_for<T: OpenAiSerializable>(mut self) -> Self {
482 self.output_schema = Some(T::openai_output_schema());
483 self
484 }
485
486 pub fn with_model(mut self, model: impl Into<String>) -> Self {
487 self.model = Some(model.into());
488 self
489 }
490
491 pub fn with_working_directory(mut self, working_directory: impl Into<String>) -> Self {
492 self.working_directory = Some(working_directory.into());
493 self
494 }
495}
496
497#[derive(Debug, Clone, Default)]
498pub struct TurnOptionsBuilder {
499 options: TurnOptions,
500}
501
502impl TurnOptionsBuilder {
503 pub fn new() -> Self {
504 Self::default()
505 }
506
507 pub fn build(self) -> TurnOptions {
508 self.options
509 }
510
511 pub fn output_schema(mut self, output_schema: Value) -> Self {
512 self.options.output_schema = Some(output_schema);
513 self
514 }
515
516 pub fn output_schema_for<T: OpenAiSerializable>(mut self) -> Self {
517 self.options.output_schema = Some(T::openai_output_schema());
518 self
519 }
520
521 pub fn clear_output_schema(mut self) -> Self {
522 self.options.output_schema = None;
523 self
524 }
525
526 pub fn working_directory(mut self, working_directory: impl Into<String>) -> Self {
527 self.options.working_directory = Some(working_directory.into());
528 self
529 }
530
531 pub fn model(mut self, model: impl Into<String>) -> Self {
532 self.options.model = Some(model.into());
533 self
534 }
535
536 pub fn model_provider(mut self, model_provider: impl Into<String>) -> Self {
537 self.options.model_provider = Some(model_provider.into());
538 self
539 }
540
541 pub fn model_reasoning_effort(mut self, model_reasoning_effort: ModelReasoningEffort) -> Self {
542 self.options.model_reasoning_effort = Some(model_reasoning_effort);
543 self
544 }
545
546 pub fn model_reasoning_summary(
547 mut self,
548 model_reasoning_summary: ModelReasoningSummary,
549 ) -> Self {
550 self.options.model_reasoning_summary = Some(model_reasoning_summary);
551 self
552 }
553
554 pub fn personality(mut self, personality: Personality) -> Self {
555 self.options.personality = Some(personality);
556 self
557 }
558
559 pub fn approval_policy(mut self, approval_policy: ApprovalMode) -> Self {
560 self.options.approval_policy = Some(approval_policy);
561 self
562 }
563
564 pub fn sandbox_policy(mut self, sandbox_policy: Value) -> Self {
565 self.options.sandbox_policy = Some(sandbox_policy);
566 self
567 }
568
569 pub fn collaboration_mode(mut self, collaboration_mode: CollaborationMode) -> Self {
570 self.options.collaboration_mode = Some(collaboration_mode);
571 self
572 }
573
574 pub fn skip_git_repo_check(mut self, enabled: bool) -> Self {
575 self.options.skip_git_repo_check = Some(enabled);
576 self
577 }
578
579 pub fn web_search_mode(mut self, web_search_mode: WebSearchMode) -> Self {
580 self.options.web_search_mode = Some(web_search_mode);
581 self
582 }
583
584 pub fn web_search_enabled(mut self, enabled: bool) -> Self {
585 self.options.web_search_enabled = Some(enabled);
586 self
587 }
588
589 pub fn network_access_enabled(mut self, enabled: bool) -> Self {
590 self.options.network_access_enabled = Some(enabled);
591 self
592 }
593
594 pub fn additional_directories(mut self, additional_directories: Vec<String>) -> Self {
595 self.options.additional_directories = Some(additional_directories);
596 self
597 }
598
599 pub fn add_directory(mut self, directory: impl Into<String>) -> Self {
600 self.options
601 .additional_directories
602 .get_or_insert_with(Vec::new)
603 .push(directory.into());
604 self
605 }
606
607 pub fn extra(mut self, extra: Map<String, Value>) -> Self {
608 self.options.extra = Some(extra);
609 self
610 }
611
612 pub fn insert_extra(mut self, key: impl Into<String>, value: Value) -> Self {
613 self.options
614 .extra
615 .get_or_insert_with(Map::new)
616 .insert(key.into(), value);
617 self
618 }
619}
620
621#[derive(Debug, Clone, PartialEq, Eq)]
622pub struct ThreadError {
623 pub message: String,
624}
625
626#[derive(Debug, Error)]
627pub enum ThreadRunError {
628 #[error(transparent)]
629 Client(#[from] ClientError),
630 #[error("{message}")]
631 TurnFailed { message: String },
632}
633
634#[derive(Debug, Clone, PartialEq, Eq)]
635pub enum UserInput {
636 Text { text: String },
637 LocalImage { path: String },
638}
639
640#[derive(Debug, Clone, PartialEq, Eq)]
641pub enum Input {
642 Text(String),
643 Items(Vec<UserInput>),
644}
645
646impl Input {
647 pub fn text(text: impl Into<String>) -> Self {
648 Self::Text(text.into())
649 }
650
651 pub fn items(items: Vec<UserInput>) -> Self {
652 Self::Items(items)
653 }
654}
655
656impl From<String> for Input {
657 fn from(value: String) -> Self {
658 Self::Text(value)
659 }
660}
661
662impl From<&str> for Input {
663 fn from(value: &str) -> Self {
664 Self::Text(value.to_string())
665 }
666}
667
668impl From<Vec<UserInput>> for Input {
669 fn from(value: Vec<UserInput>) -> Self {
670 Self::Items(value)
671 }
672}
673
674#[derive(Debug, Clone, PartialEq, Eq)]
675pub struct Usage {
676 pub input_tokens: i64,
677 pub cached_input_tokens: i64,
678 pub output_tokens: i64,
679}
680
681#[derive(Debug, Clone, PartialEq)]
682pub struct Turn {
683 pub items: Vec<ThreadItem>,
684 pub final_response: String,
685 pub usage: Option<Usage>,
686}
687
688pub type RunResult = Turn;
689
690pub struct StreamedTurn {
691 receiver: mpsc::Receiver<Result<ThreadEvent, ClientError>>,
692 task: JoinHandle<()>,
693}
694
695impl StreamedTurn {
696 pub async fn next_event(&mut self) -> Option<Result<ThreadEvent, ClientError>> {
697 self.receiver.recv().await
698 }
699}
700
701impl Drop for StreamedTurn {
702 fn drop(&mut self) {
703 self.task.abort();
704 }
705}
706
707#[derive(Debug, Clone, PartialEq)]
708pub enum ThreadEvent {
709 ThreadStarted { thread_id: String },
710 TurnStarted,
711 TurnCompleted { usage: Option<Usage> },
712 TurnFailed { error: ThreadError },
713 ItemStarted { item: ThreadItem },
714 ItemUpdated { item: ThreadItem },
715 ItemCompleted { item: ThreadItem },
716 Error { message: String },
717}
718
719#[derive(Debug, Clone, Copy, PartialEq, Eq)]
720pub enum AgentMessagePhase {
721 Commentary,
722 FinalAnswer,
723 Unknown,
724}
725
726impl AgentMessagePhase {
727 pub fn as_str(self) -> &'static str {
728 match self {
729 Self::Commentary => "commentary",
730 Self::FinalAnswer => "final_answer",
731 Self::Unknown => "unknown",
732 }
733 }
734}
735
736#[derive(Debug, Clone, PartialEq, Eq)]
737pub struct AgentMessageItem {
738 pub id: String,
739 pub text: String,
740 pub phase: Option<AgentMessagePhase>,
741}
742
743impl AgentMessageItem {
744 pub fn is_final_answer(&self) -> bool {
745 matches!(self.phase, Some(AgentMessagePhase::FinalAnswer))
746 }
747}
748
749#[derive(Debug, Clone, PartialEq)]
750pub enum UserMessageContentItem {
751 Text { text: String },
752 Image { url: String },
753 LocalImage { path: String },
754 Unknown(Value),
755}
756
757#[derive(Debug, Clone, PartialEq)]
758pub struct UserMessageItem {
759 pub id: String,
760 pub content: Vec<UserMessageContentItem>,
761}
762
763#[derive(Debug, Clone, PartialEq, Eq)]
764pub struct PlanItem {
765 pub id: String,
766 pub text: String,
767}
768
769#[derive(Debug, Clone, PartialEq, Eq)]
770pub struct ReasoningItem {
771 pub id: String,
772 pub text: String,
773}
774
775#[derive(Debug, Clone, Copy, PartialEq, Eq)]
776pub enum CommandExecutionStatus {
777 InProgress,
778 Completed,
779 Failed,
780 Declined,
781 Unknown,
782}
783
784#[derive(Debug, Clone, PartialEq, Eq)]
785pub struct CommandExecutionItem {
786 pub id: String,
787 pub command: String,
788 pub aggregated_output: String,
789 pub exit_code: Option<i32>,
790 pub status: CommandExecutionStatus,
791}
792
793#[derive(Debug, Clone, Copy, PartialEq, Eq)]
794pub enum PatchChangeKind {
795 Add,
796 Delete,
797 Update,
798 Unknown,
799}
800
801#[derive(Debug, Clone, PartialEq, Eq)]
802pub struct FileUpdateChange {
803 pub path: String,
804 pub kind: PatchChangeKind,
805}
806
807#[derive(Debug, Clone, Copy, PartialEq, Eq)]
808pub enum PatchApplyStatus {
809 InProgress,
810 Completed,
811 Failed,
812 Declined,
813 Unknown,
814}
815
816#[derive(Debug, Clone, PartialEq, Eq)]
817pub struct FileChangeItem {
818 pub id: String,
819 pub changes: Vec<FileUpdateChange>,
820 pub status: PatchApplyStatus,
821}
822
823#[derive(Debug, Clone, Copy, PartialEq, Eq)]
824pub enum McpToolCallStatus {
825 InProgress,
826 Completed,
827 Failed,
828 Unknown,
829}
830
831#[derive(Debug, Clone, PartialEq)]
832pub struct McpToolCallItem {
833 pub id: String,
834 pub server: String,
835 pub tool: String,
836 pub arguments: Value,
837 pub result: Option<Value>,
838 pub error: Option<ThreadError>,
839 pub status: McpToolCallStatus,
840}
841
842#[derive(Debug, Clone, PartialEq)]
843pub struct DynamicToolCallItem {
844 pub id: String,
845 pub tool: String,
846 pub arguments: Value,
847 pub status: String,
848 pub content_items: Vec<Value>,
849 pub success: Option<bool>,
850 pub duration_ms: Option<u64>,
851}
852
853#[derive(Debug, Clone, PartialEq, Eq)]
854pub struct CollabToolCallItem {
855 pub id: String,
856 pub tool: String,
857 pub status: String,
858 pub sender_thread_id: String,
859 pub receiver_thread_id: Option<String>,
860 pub new_thread_id: Option<String>,
861 pub prompt: Option<String>,
862 pub agent_status: Option<String>,
863}
864
865#[derive(Debug, Clone, PartialEq, Eq)]
866pub struct WebSearchItem {
867 pub id: String,
868 pub query: String,
869}
870
871#[derive(Debug, Clone, PartialEq, Eq)]
872pub struct ImageViewItem {
873 pub id: String,
874 pub path: String,
875}
876
877#[derive(Debug, Clone, PartialEq, Eq)]
878pub struct ReviewModeItem {
879 pub id: String,
880 pub review: String,
881}
882
883#[derive(Debug, Clone, PartialEq, Eq)]
884pub struct ContextCompactionItem {
885 pub id: String,
886}
887
888#[derive(Debug, Clone, PartialEq, Eq)]
889pub struct TodoItem {
890 pub text: String,
891 pub completed: bool,
892}
893
894#[derive(Debug, Clone, PartialEq, Eq)]
895pub struct TodoListItem {
896 pub id: String,
897 pub items: Vec<TodoItem>,
898}
899
900#[derive(Debug, Clone, PartialEq, Eq)]
901pub struct ErrorItem {
902 pub id: String,
903 pub message: String,
904}
905
906#[derive(Debug, Clone, PartialEq)]
907pub struct UnknownItem {
908 pub id: Option<String>,
909 pub item_type: Option<String>,
910 pub raw: Value,
911}
912
913#[derive(Debug, Clone, PartialEq)]
914pub enum ThreadItem {
915 AgentMessage(AgentMessageItem),
916 UserMessage(UserMessageItem),
917 Plan(PlanItem),
918 Reasoning(ReasoningItem),
919 CommandExecution(CommandExecutionItem),
920 FileChange(FileChangeItem),
921 McpToolCall(McpToolCallItem),
922 DynamicToolCall(DynamicToolCallItem),
923 CollabToolCall(CollabToolCallItem),
924 WebSearch(WebSearchItem),
925 ImageView(ImageViewItem),
926 EnteredReviewMode(ReviewModeItem),
927 ExitedReviewMode(ReviewModeItem),
928 ContextCompaction(ContextCompactionItem),
929 TodoList(TodoListItem),
930 Error(ErrorItem),
931 Unknown(UnknownItem),
932}
933
934macro_rules! codex_forward_typed_method {
935 ($method:ident, $params_ty:ty, $result_ty:ty) => {
936 pub async fn $method(&self, params: $params_ty) -> Result<$result_ty, ClientError> {
937 self.ensure_initialized().await?;
938 self.inner.client.$method(params).await
939 }
940 };
941}
942
943macro_rules! codex_forward_null_method {
944 ($method:ident, $result_ty:ty) => {
945 pub async fn $method(&self) -> Result<$result_ty, ClientError> {
946 self.ensure_initialized().await?;
947 self.inner.client.$method().await
948 }
949 };
950}
951
952#[derive(Clone)]
953pub struct Codex {
954 inner: Arc<CodexInner>,
955}
956
957struct CodexInner {
958 client: CodexClient,
959 initialize_params: requests::InitializeParams,
960 initialized: AtomicBool,
961 initialize_lock: Mutex<()>,
962}
963
964impl Codex {
965 pub fn with_initialize_params(
966 client: CodexClient,
967 initialize_params: requests::InitializeParams,
968 ) -> Self {
969 Self {
970 inner: Arc::new(CodexInner {
971 client,
972 initialize_params,
973 initialized: AtomicBool::new(false),
974 initialize_lock: Mutex::new(()),
975 }),
976 }
977 }
978
979 pub fn from_client(client: CodexClient) -> Self {
980 let initialize_params = requests::InitializeParams::new(requests::ClientInfo::new(
981 "codex_sdk_rs",
982 "Codex Rust SDK",
983 env!("CARGO_PKG_VERSION"),
984 ));
985 Self::with_initialize_params(client, initialize_params)
986 }
987
988 pub async fn spawn_stdio(config: StdioConfig) -> Result<Self, ClientError> {
989 let client = CodexClient::spawn_stdio(config).await?;
990 Ok(Self::from_client(client))
991 }
992
993 pub async fn connect_ws(config: WsConfig) -> Result<Self, ClientError> {
994 let client = CodexClient::connect_ws(config).await?;
995 Ok(Self::from_client(client))
996 }
997
998 pub async fn start_and_connect_ws(config: WsConfig) -> Result<Self, ClientError> {
999 let client = CodexClient::start_and_connect_ws(config).await?;
1000 Ok(Self::from_client(client))
1001 }
1002
1003 pub fn start_thread(&self, options: ThreadOptions) -> Thread {
1004 Thread {
1005 codex: self.clone(),
1006 id: None,
1007 pending_resume: None,
1008 last_turn_id: None,
1009 options,
1010 }
1011 }
1012
1013 pub fn resume_thread(&self, target: impl Into<ResumeThread>, options: ThreadOptions) -> Thread {
1014 let target = target.into();
1015 let id = match &target {
1016 ResumeThread::Latest => None,
1017 ResumeThread::ById(thread_id) => Some(thread_id.clone()),
1018 };
1019
1020 Thread {
1021 codex: self.clone(),
1022 id,
1023 pending_resume: Some(target),
1024 last_turn_id: None,
1025 options,
1026 }
1027 }
1028
1029 pub fn resume_thread_by_id(&self, id: impl Into<String>, options: ThreadOptions) -> Thread {
1030 self.resume_thread(ResumeThread::ById(id.into()), options)
1031 }
1032
1033 pub fn resume_latest_thread(&self, options: ThreadOptions) -> Thread {
1034 self.resume_thread(ResumeThread::Latest, options)
1035 }
1036
1037 pub async fn ask(&self, input: impl Into<Input>) -> Result<String, ThreadRunError> {
1039 self.ask_with_options(input, ThreadOptions::default(), TurnOptions::default())
1040 .await
1041 }
1042
1043 pub async fn ask_with_options(
1045 &self,
1046 input: impl Into<Input>,
1047 thread_options: ThreadOptions,
1048 turn_options: TurnOptions,
1049 ) -> Result<String, ThreadRunError> {
1050 let mut thread = self.start_thread(thread_options);
1051 thread.ask(input, turn_options).await
1052 }
1053
1054 pub async fn thread_list(
1056 &self,
1057 params: requests::ThreadListParams,
1058 ) -> Result<responses::ThreadListResult, ClientError> {
1059 self.ensure_initialized().await?;
1060 self.inner.client.thread_list(params).await
1061 }
1062
1063 codex_forward_typed_method!(
1064 thread_start,
1065 requests::ThreadStartParams,
1066 responses::ThreadResult
1067 );
1068 codex_forward_typed_method!(
1069 thread_resume,
1070 requests::ThreadResumeParams,
1071 responses::ThreadResult
1072 );
1073 codex_forward_typed_method!(
1074 thread_fork,
1075 requests::ThreadForkParams,
1076 responses::ThreadResult
1077 );
1078 codex_forward_typed_method!(
1079 thread_archive,
1080 requests::ThreadArchiveParams,
1081 responses::ThreadArchiveResult
1082 );
1083 codex_forward_typed_method!(
1084 thread_name_set,
1085 requests::ThreadSetNameParams,
1086 responses::ThreadSetNameResult
1087 );
1088 codex_forward_typed_method!(
1089 thread_unarchive,
1090 requests::ThreadUnarchiveParams,
1091 responses::ThreadUnarchiveResult
1092 );
1093 codex_forward_typed_method!(
1094 thread_compact_start,
1095 requests::ThreadCompactStartParams,
1096 responses::ThreadCompactStartResult
1097 );
1098 codex_forward_typed_method!(
1099 thread_background_terminals_clean,
1100 requests::ThreadBackgroundTerminalsCleanParams,
1101 responses::ThreadBackgroundTerminalsCleanResult
1102 );
1103 codex_forward_typed_method!(
1104 thread_rollback,
1105 requests::ThreadRollbackParams,
1106 responses::ThreadRollbackResult
1107 );
1108 codex_forward_typed_method!(
1109 thread_loaded_list,
1110 requests::ThreadLoadedListParams,
1111 responses::ThreadLoadedListResult
1112 );
1113 codex_forward_typed_method!(
1114 thread_read,
1115 requests::ThreadReadParams,
1116 responses::ThreadReadResult
1117 );
1118 codex_forward_typed_method!(
1119 skills_list,
1120 requests::SkillsListParams,
1121 responses::SkillsListResult
1122 );
1123 codex_forward_typed_method!(
1124 skills_remote_list,
1125 requests::SkillsRemoteReadParams,
1126 responses::SkillsRemoteReadResult
1127 );
1128 codex_forward_typed_method!(
1129 skills_remote_export,
1130 requests::SkillsRemoteWriteParams,
1131 responses::SkillsRemoteWriteResult
1132 );
1133 codex_forward_typed_method!(
1134 app_list,
1135 requests::AppsListParams,
1136 responses::AppsListResult
1137 );
1138 codex_forward_typed_method!(
1139 skills_config_write,
1140 requests::SkillsConfigWriteParams,
1141 responses::SkillsConfigWriteResult
1142 );
1143 codex_forward_typed_method!(turn_start, requests::TurnStartParams, responses::TurnResult);
1144 codex_forward_typed_method!(
1145 turn_steer,
1146 requests::TurnSteerParams,
1147 responses::TurnSteerResult
1148 );
1149 codex_forward_typed_method!(turn_interrupt, requests::TurnInterruptParams, EmptyObject);
1150 codex_forward_typed_method!(
1151 review_start,
1152 requests::ReviewStartParams,
1153 responses::ReviewStartResult
1154 );
1155 codex_forward_typed_method!(
1156 model_list,
1157 requests::ModelListParams,
1158 responses::ModelListResult
1159 );
1160 codex_forward_typed_method!(
1161 experimental_feature_list,
1162 requests::ExperimentalFeatureListParams,
1163 responses::ExperimentalFeatureListResult
1164 );
1165 codex_forward_typed_method!(
1166 collaboration_mode_list,
1167 requests::CollaborationModeListParams,
1168 responses::CollaborationModeListResult
1169 );
1170 codex_forward_typed_method!(
1171 mock_experimental_method,
1172 requests::MockExperimentalMethodParams,
1173 responses::MockExperimentalMethodResult
1174 );
1175 codex_forward_typed_method!(
1176 mcp_server_oauth_login,
1177 requests::McpServerOauthLoginParams,
1178 responses::McpServerOauthLoginResult
1179 );
1180 codex_forward_typed_method!(
1181 mcp_server_status_list,
1182 requests::ListMcpServerStatusParams,
1183 responses::McpServerStatusListResult
1184 );
1185 codex_forward_typed_method!(
1186 windows_sandbox_setup_start,
1187 requests::WindowsSandboxSetupStartParams,
1188 responses::WindowsSandboxSetupStartResult
1189 );
1190 codex_forward_typed_method!(
1191 account_login_start,
1192 requests::LoginAccountParams,
1193 responses::LoginAccountResult
1194 );
1195 codex_forward_typed_method!(
1196 account_login_cancel,
1197 requests::CancelLoginAccountParams,
1198 EmptyObject
1199 );
1200 codex_forward_typed_method!(
1201 feedback_upload,
1202 requests::FeedbackUploadParams,
1203 responses::FeedbackUploadResult
1204 );
1205 codex_forward_typed_method!(
1206 command_exec,
1207 requests::CommandExecParams,
1208 responses::CommandExecResult
1209 );
1210 codex_forward_typed_method!(
1211 config_read,
1212 requests::ConfigReadParams,
1213 responses::ConfigReadResult
1214 );
1215 codex_forward_typed_method!(
1216 config_value_write,
1217 requests::ConfigValueWriteParams,
1218 responses::ConfigValueWriteResult
1219 );
1220 codex_forward_typed_method!(
1221 config_batch_write,
1222 requests::ConfigBatchWriteParams,
1223 responses::ConfigBatchWriteResult
1224 );
1225 codex_forward_typed_method!(
1226 account_read,
1227 requests::GetAccountParams,
1228 responses::GetAccountResult
1229 );
1230 codex_forward_typed_method!(
1231 fuzzy_file_search_session_start,
1232 requests::FuzzyFileSearchSessionStartParams,
1233 responses::FuzzyFileSearchSessionStartResult
1234 );
1235 codex_forward_typed_method!(
1236 fuzzy_file_search_session_update,
1237 requests::FuzzyFileSearchSessionUpdateParams,
1238 responses::FuzzyFileSearchSessionUpdateResult
1239 );
1240 codex_forward_typed_method!(
1241 fuzzy_file_search_session_stop,
1242 requests::FuzzyFileSearchSessionStopParams,
1243 responses::FuzzyFileSearchSessionStopResult
1244 );
1245
1246 pub async fn skills_remote_read(
1247 &self,
1248 params: requests::SkillsRemoteReadParams,
1249 ) -> Result<responses::SkillsRemoteReadResult, ClientError> {
1250 self.skills_remote_list(params).await
1251 }
1252
1253 pub async fn skills_remote_write(
1254 &self,
1255 params: requests::SkillsRemoteWriteParams,
1256 ) -> Result<responses::SkillsRemoteWriteResult, ClientError> {
1257 self.skills_remote_export(params).await
1258 }
1259
1260 codex_forward_null_method!(config_mcp_server_reload, EmptyObject);
1261 codex_forward_null_method!(account_logout, EmptyObject);
1262 codex_forward_null_method!(
1263 account_rate_limits_read,
1264 responses::AccountRateLimitsReadResult
1265 );
1266 codex_forward_null_method!(
1267 config_requirements_read,
1268 responses::ConfigRequirementsReadResult
1269 );
1270
1271 pub async fn send_raw_request(
1272 &self,
1273 method: impl Into<String>,
1274 params: Value,
1275 timeout: Option<std::time::Duration>,
1276 ) -> Result<Value, ClientError> {
1277 self.ensure_initialized().await?;
1278 self.inner
1279 .client
1280 .send_raw_request(method, params, timeout)
1281 .await
1282 }
1283
1284 pub async fn send_raw_notification(
1285 &self,
1286 method: impl Into<String>,
1287 params: Value,
1288 ) -> Result<(), ClientError> {
1289 self.ensure_initialized().await?;
1290 self.inner
1291 .client
1292 .send_raw_notification(method, params)
1293 .await
1294 }
1295
1296 pub fn client(&self) -> CodexClient {
1297 self.inner.client.clone()
1298 }
1299
1300 async fn ensure_initialized(&self) -> Result<(), ClientError> {
1301 if self.inner.initialized.load(Ordering::SeqCst) {
1302 return Ok(());
1303 }
1304
1305 let _guard = self.inner.initialize_lock.lock().await;
1306 if self.inner.initialized.load(Ordering::SeqCst) {
1307 return Ok(());
1308 }
1309
1310 match self
1311 .inner
1312 .client
1313 .initialize(self.inner.initialize_params.clone())
1314 .await
1315 {
1316 Ok(_) | Err(ClientError::AlreadyInitialized) => {}
1317 Err(err) => return Err(err),
1318 }
1319
1320 self.inner.client.initialized().await?;
1321 self.inner.initialized.store(true, Ordering::SeqCst);
1322 Ok(())
1323 }
1324}
1325
1326pub struct Thread {
1327 codex: Codex,
1328 id: Option<String>,
1329 pending_resume: Option<ResumeThread>,
1330 last_turn_id: Option<String>,
1331 options: ThreadOptions,
1332}
1333
1334impl Thread {
1335 pub fn id(&self) -> Option<&str> {
1336 self.id.as_deref()
1337 }
1338
1339 async fn ensure_thread_started_or_resumed(
1340 &mut self,
1341 ) -> Result<(String, Option<String>), ClientError> {
1342 self.codex.ensure_initialized().await?;
1343
1344 let mut emit_thread_started = None;
1345 if let Some(pending_resume) = self.pending_resume.clone() {
1346 let thread_id = match pending_resume {
1347 ResumeThread::ById(thread_id) => thread_id,
1348 ResumeThread::Latest => self.resolve_latest_thread_id().await?,
1349 };
1350 let resume_params = build_thread_resume_params(&thread_id, &self.options);
1351 let resumed = self.codex.inner.client.thread_resume(resume_params).await?;
1352 self.id = Some(resumed.thread.id);
1353 self.pending_resume = None;
1354 self.last_turn_id = None;
1355 } else if self.id.is_none() {
1356 let thread = self
1357 .codex
1358 .inner
1359 .client
1360 .thread_start(build_thread_start_params(&self.options))
1361 .await?;
1362 self.id = Some(thread.thread.id.clone());
1363 emit_thread_started = Some(thread.thread.id);
1364 self.last_turn_id = None;
1365 }
1366
1367 let thread_id = self.id.clone().ok_or_else(|| {
1368 ClientError::TransportSend("thread id unavailable after start/resume".to_string())
1369 })?;
1370
1371 Ok((thread_id, emit_thread_started))
1372 }
1373
1374 async fn resolve_latest_thread_id(&self) -> Result<String, ClientError> {
1375 let mut cursor: Option<String> = None;
1376 let mut pages_scanned = 0usize;
1377 let mut threads = Vec::new();
1378
1379 loop {
1380 pages_scanned += 1;
1381 if pages_scanned > MAX_THREAD_LIST_PAGES {
1382 return Err(ClientError::TransportSend(format!(
1383 "could not resolve latest thread after scanning {MAX_THREAD_LIST_PAGES} pages"
1384 )));
1385 }
1386
1387 let result = self
1388 .codex
1389 .inner
1390 .client
1391 .thread_list(requests::ThreadListParams {
1392 limit: Some(THREAD_LIST_PAGE_LIMIT),
1393 cursor: cursor.clone(),
1394 ..Default::default()
1395 })
1396 .await?;
1397
1398 threads.extend(result.data);
1399
1400 match result.next_cursor {
1401 Some(next) => cursor = Some(next),
1402 None => break,
1403 }
1404 }
1405
1406 select_latest_thread_id(&threads, self.options.working_directory.as_deref()).ok_or_else(
1407 || match self.options.working_directory.as_deref() {
1408 Some(working_directory) => ClientError::TransportSend(format!(
1409 "no recorded thread found for working directory `{working_directory}`"
1410 )),
1411 None => ClientError::TransportSend("no recorded thread found".to_string()),
1412 },
1413 )
1414 }
1415
1416 pub async fn set_name(
1417 &mut self,
1418 name: impl Into<String>,
1419 ) -> Result<responses::ThreadSetNameResult, ClientError> {
1420 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1421 self.codex
1422 .inner
1423 .client
1424 .thread_name_set(requests::ThreadSetNameParams {
1425 thread_id,
1426 name: name.into(),
1427 extra: Map::new(),
1428 })
1429 .await
1430 }
1431
1432 pub async fn read(
1433 &mut self,
1434 include_turns: Option<bool>,
1435 ) -> Result<responses::ThreadReadResult, ClientError> {
1436 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1437 self.codex
1438 .inner
1439 .client
1440 .thread_read(requests::ThreadReadParams {
1441 thread_id,
1442 include_turns,
1443 extra: Map::new(),
1444 })
1445 .await
1446 }
1447
1448 pub async fn archive(&mut self) -> Result<responses::ThreadArchiveResult, ClientError> {
1449 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1450 self.codex
1451 .inner
1452 .client
1453 .thread_archive(requests::ThreadArchiveParams {
1454 thread_id,
1455 extra: Map::new(),
1456 })
1457 .await
1458 }
1459
1460 pub async fn unarchive(&mut self) -> Result<responses::ThreadUnarchiveResult, ClientError> {
1461 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1462 self.codex
1463 .inner
1464 .client
1465 .thread_unarchive(requests::ThreadUnarchiveParams {
1466 thread_id,
1467 extra: Map::new(),
1468 })
1469 .await
1470 }
1471
1472 pub async fn rollback(
1473 &mut self,
1474 count: u32,
1475 ) -> Result<responses::ThreadRollbackResult, ClientError> {
1476 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1477 self.codex
1478 .inner
1479 .client
1480 .thread_rollback(requests::ThreadRollbackParams {
1481 thread_id,
1482 count,
1483 extra: Map::new(),
1484 })
1485 .await
1486 }
1487
1488 pub async fn compact_start(
1489 &mut self,
1490 ) -> Result<responses::ThreadCompactStartResult, ClientError> {
1491 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1492 self.codex
1493 .inner
1494 .client
1495 .thread_compact_start(requests::ThreadCompactStartParams {
1496 thread_id,
1497 extra: Map::new(),
1498 })
1499 .await
1500 }
1501
1502 pub async fn steer(
1503 &mut self,
1504 input: impl Into<Input>,
1505 expected_turn_id: Option<String>,
1506 ) -> Result<responses::TurnSteerResult, ClientError> {
1507 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1508 let expected_turn_id = expected_turn_id.or_else(|| self.last_turn_id.clone());
1509 let expected_turn_id = expected_turn_id.ok_or_else(|| {
1510 ClientError::TransportSend(
1511 "turn/steer requires expected_turn_id or a previously started turn".to_string(),
1512 )
1513 })?;
1514 self.codex
1515 .inner
1516 .client
1517 .turn_steer(requests::TurnSteerParams {
1518 thread_id,
1519 input: normalize_input(input.into()),
1520 expected_turn_id: Some(expected_turn_id),
1521 extra: Map::new(),
1522 })
1523 .await
1524 }
1525
1526 pub async fn interrupt(&mut self, turn_id: impl Into<String>) -> Result<(), ClientError> {
1527 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1528 self.codex
1529 .inner
1530 .client
1531 .turn_interrupt(requests::TurnInterruptParams {
1532 thread_id,
1533 turn_id: turn_id.into(),
1534 extra: Map::new(),
1535 })
1536 .await?;
1537 Ok(())
1538 }
1539
1540 pub async fn run_streamed(
1541 &mut self,
1542 input: impl Into<Input>,
1543 turn_options: TurnOptions,
1544 ) -> Result<StreamedTurn, ClientError> {
1545 let (thread_id, emit_thread_started) = self.ensure_thread_started_or_resumed().await?;
1546
1547 let server_events = self.codex.inner.client.subscribe();
1548
1549 let turn_response = self
1550 .codex
1551 .inner
1552 .client
1553 .turn_start(build_turn_start_params(
1554 &thread_id,
1555 input.into(),
1556 &self.options,
1557 &turn_options,
1558 ))
1559 .await?;
1560 let turn_id = turn_response.turn.id;
1561 self.last_turn_id = Some(turn_id.clone());
1562
1563 let (tx, rx) = mpsc::channel(256);
1564
1565 if let Some(started_thread_id) = emit_thread_started
1566 && tx
1567 .send(Ok(ThreadEvent::ThreadStarted {
1568 thread_id: started_thread_id,
1569 }))
1570 .await
1571 .is_err()
1572 {
1573 return Err(ClientError::TransportClosed);
1574 }
1575
1576 if tx.send(Ok(ThreadEvent::TurnStarted)).await.is_err() {
1577 return Err(ClientError::TransportClosed);
1578 }
1579
1580 let task = tokio::spawn(async move {
1581 pump_turn_events(server_events, tx, thread_id, turn_id).await;
1582 });
1583
1584 Ok(StreamedTurn { receiver: rx, task })
1585 }
1586
1587 pub async fn run(
1588 &mut self,
1589 input: impl Into<Input>,
1590 turn_options: TurnOptions,
1591 ) -> Result<Turn, ThreadRunError> {
1592 let mut streamed = self.run_streamed(input, turn_options).await?;
1593 let mut items = Vec::new();
1594 let mut final_answer = None;
1595 let mut fallback_response = None;
1596 let mut usage = None;
1597 let mut saw_terminal = false;
1598
1599 while let Some(next) = streamed.next_event().await {
1600 let event = next.map_err(ThreadRunError::Client)?;
1601 match event {
1602 ThreadEvent::ItemCompleted { item } => {
1603 update_final_response_candidates(
1604 &item,
1605 &mut final_answer,
1606 &mut fallback_response,
1607 );
1608 items.push(item);
1609 }
1610 ThreadEvent::TurnCompleted { usage: completed } => {
1611 usage = completed;
1612 saw_terminal = true;
1613 break;
1614 }
1615 ThreadEvent::TurnFailed { error } => {
1616 return Err(ThreadRunError::TurnFailed {
1617 message: error.message,
1618 });
1619 }
1620 ThreadEvent::Error { message } => {
1621 return Err(ThreadRunError::TurnFailed { message });
1622 }
1623 ThreadEvent::ThreadStarted { .. }
1624 | ThreadEvent::TurnStarted
1625 | ThreadEvent::ItemStarted { .. }
1626 | ThreadEvent::ItemUpdated { .. } => {}
1627 }
1628 }
1629
1630 if !saw_terminal {
1631 return Err(ThreadRunError::Client(ClientError::TransportClosed));
1632 }
1633
1634 Ok(Turn {
1635 items,
1636 final_response: final_answer.or(fallback_response).unwrap_or_default(),
1637 usage,
1638 })
1639 }
1640
1641 pub async fn ask(
1643 &mut self,
1644 input: impl Into<Input>,
1645 turn_options: TurnOptions,
1646 ) -> Result<String, ThreadRunError> {
1647 let turn = self.run(input, turn_options).await?;
1648 Ok(turn.final_response)
1649 }
1650}
1651
1652async fn pump_turn_events(
1653 mut server_events: tokio::sync::broadcast::Receiver<ServerEvent>,
1654 tx: mpsc::Sender<Result<ThreadEvent, ClientError>>,
1655 thread_id: String,
1656 turn_id: String,
1657) {
1658 let mut latest_usage: Option<Usage> = None;
1659
1660 loop {
1661 let next = server_events.recv().await;
1662 let server_event = match next {
1663 Ok(event) => event,
1664 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
1665 let _ = tx.send(Err(ClientError::TransportClosed)).await;
1666 break;
1667 }
1668 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
1669 continue;
1670 }
1671 };
1672
1673 match server_event {
1674 ServerEvent::Notification(notification) => match notification {
1675 ServerNotification::ItemStarted(payload)
1676 if matches_target_from_extra(&payload.extra, &thread_id, Some(&turn_id)) =>
1677 {
1678 if tx
1679 .send(Ok(ThreadEvent::ItemStarted {
1680 item: parse_thread_item(payload.item),
1681 }))
1682 .await
1683 .is_err()
1684 {
1685 break;
1686 }
1687 }
1688 ServerNotification::ItemCompleted(payload)
1689 if matches_target_from_extra(&payload.extra, &thread_id, Some(&turn_id)) =>
1690 {
1691 if tx
1692 .send(Ok(ThreadEvent::ItemCompleted {
1693 item: parse_thread_item(payload.item),
1694 }))
1695 .await
1696 .is_err()
1697 {
1698 break;
1699 }
1700 }
1701 ServerNotification::ItemAgentMessageDelta(delta)
1702 if matches_target_from_extra(&delta.extra, &thread_id, Some(&turn_id)) =>
1703 {
1704 let text = delta.delta.or(delta.text).unwrap_or_default();
1705 if text.is_empty() {
1706 continue;
1707 }
1708 let item = ThreadItem::AgentMessage(AgentMessageItem {
1709 id: delta.item_id.unwrap_or_default(),
1710 text,
1711 phase: None,
1712 });
1713 if tx
1714 .send(Ok(ThreadEvent::ItemUpdated { item }))
1715 .await
1716 .is_err()
1717 {
1718 break;
1719 }
1720 }
1721 ServerNotification::ItemPlanDelta(delta)
1722 if matches_target_from_extra(&delta.extra, &thread_id, Some(&turn_id)) =>
1723 {
1724 let text = delta.delta.or(delta.text).unwrap_or_default();
1725 if text.is_empty() {
1726 continue;
1727 }
1728 let item = ThreadItem::Plan(PlanItem {
1729 id: delta.item_id.unwrap_or_default(),
1730 text,
1731 });
1732 if tx
1733 .send(Ok(ThreadEvent::ItemUpdated { item }))
1734 .await
1735 .is_err()
1736 {
1737 break;
1738 }
1739 }
1740 ServerNotification::TurnCompleted(payload)
1741 if payload.turn.id == turn_id
1742 && matches_target_from_extra(
1743 &payload.turn.extra,
1744 &thread_id,
1745 Some(&turn_id),
1746 ) =>
1747 {
1748 let status = payload.turn.status.unwrap_or_default().to_ascii_lowercase();
1749 if status == "failed" {
1750 let message = payload
1751 .turn
1752 .error
1753 .map(|error| error.message)
1754 .unwrap_or_else(|| "turn failed".to_string());
1755 let _ = tx
1756 .send(Ok(ThreadEvent::TurnFailed {
1757 error: ThreadError { message },
1758 }))
1759 .await;
1760 break;
1761 }
1762
1763 let usage = parse_usage_from_turn_extra(&payload.turn.extra)
1764 .or_else(|| latest_usage.clone());
1765 let _ = tx.send(Ok(ThreadEvent::TurnCompleted { usage })).await;
1766 break;
1767 }
1768 ServerNotification::ThreadTokenUsageUpdated(payload)
1769 if payload
1770 .thread_id
1771 .as_deref()
1772 .is_none_or(|incoming| incoming == thread_id) =>
1773 {
1774 if !matches_target_from_extra(&payload.extra, &thread_id, Some(&turn_id)) {
1775 continue;
1776 }
1777 latest_usage = payload
1778 .usage
1779 .as_ref()
1780 .and_then(parse_usage_from_value)
1781 .or_else(|| {
1782 payload
1783 .extra
1784 .get("tokenUsage")
1785 .and_then(parse_usage_from_value)
1786 });
1787 }
1788 ServerNotification::Error(payload)
1789 if matches_target_from_extra(&payload.extra, &thread_id, Some(&turn_id)) =>
1790 {
1791 let _ = tx
1792 .send(Ok(ThreadEvent::Error {
1793 message: payload.error.message,
1794 }))
1795 .await;
1796 }
1797 _ => {}
1798 },
1799 ServerEvent::TransportClosed => {
1800 let _ = tx.send(Err(ClientError::TransportClosed)).await;
1801 break;
1802 }
1803 ServerEvent::ServerRequest(_) => {}
1804 }
1805 }
1806}
1807
1808fn update_final_response_candidates(
1809 item: &ThreadItem,
1810 final_answer: &mut Option<String>,
1811 fallback_response: &mut Option<String>,
1812) {
1813 let ThreadItem::AgentMessage(agent_message) = item else {
1814 return;
1815 };
1816
1817 if agent_message.is_final_answer() {
1818 *final_answer = Some(agent_message.text.clone());
1819 } else {
1820 *fallback_response = Some(agent_message.text.clone());
1821 }
1822}
1823
1824fn select_latest_thread_id(
1825 threads: &[responses::ThreadSummary],
1826 working_directory: Option<&str>,
1827) -> Option<String> {
1828 let mut selected: Option<(Option<i64>, String)> = None;
1829
1830 for thread in threads {
1831 if !thread_matches_working_directory(thread, working_directory) {
1832 continue;
1833 }
1834
1835 let candidate = (thread_recency_score(thread), thread.id.clone());
1836 match &selected {
1837 None => selected = Some(candidate),
1838 Some((Some(best_score), _)) if candidate.0.is_some_and(|score| score > *best_score) => {
1842 selected = Some(candidate)
1843 }
1844 Some(_) => {}
1845 }
1846 }
1847
1848 selected.map(|(_, thread_id)| thread_id)
1849}
1850
1851fn thread_matches_working_directory(
1852 thread: &responses::ThreadSummary,
1853 working_directory: Option<&str>,
1854) -> bool {
1855 let Some(working_directory) = working_directory else {
1856 return true;
1857 };
1858
1859 thread.extra.get("cwd").and_then(Value::as_str) == Some(working_directory)
1860}
1861
1862fn thread_recency_score(thread: &responses::ThreadSummary) -> Option<i64> {
1863 parse_timestamp(thread.extra.get("updatedAt"))
1864 .or_else(|| parse_timestamp(thread.extra.get("createdAt")))
1865}
1866
1867fn parse_timestamp(value: Option<&Value>) -> Option<i64> {
1868 let value = value?;
1869 match value {
1870 Value::Number(number) => number
1871 .as_i64()
1872 .or_else(|| number.as_u64().and_then(|raw| i64::try_from(raw).ok())),
1873 Value::String(raw) => raw.parse::<i64>().ok(),
1874 _ => None,
1875 }
1876}
1877
1878fn build_thread_start_params(options: &ThreadOptions) -> requests::ThreadStartParams {
1879 let mut extra = Map::new();
1880 if let Some(skip) = options.skip_git_repo_check {
1881 extra.insert("skipGitRepoCheck".to_string(), Value::Bool(skip));
1882 }
1883 if let Some(mode) = options.web_search_mode {
1884 extra.insert(
1885 "webSearchMode".to_string(),
1886 Value::String(mode.as_str().to_string()),
1887 );
1888 }
1889 if let Some(enabled) = options.web_search_enabled {
1890 extra.insert("webSearchEnabled".to_string(), Value::Bool(enabled));
1891 }
1892 if let Some(network) = options.network_access_enabled {
1893 extra.insert("networkAccessEnabled".to_string(), Value::Bool(network));
1894 }
1895 if let Some(additional) = &options.additional_directories {
1896 extra.insert(
1897 "additionalDirectories".to_string(),
1898 Value::Array(
1899 additional
1900 .iter()
1901 .map(|entry| Value::String(entry.clone()))
1902 .collect(),
1903 ),
1904 );
1905 }
1906 if let Some(config) = &options.config {
1907 extra.insert("config".to_string(), Value::Object(config.clone()));
1908 }
1909 if let Some(dynamic_tools) = &options.dynamic_tools {
1910 extra.insert(
1911 "dynamicTools".to_string(),
1912 Value::Array(
1913 dynamic_tools
1914 .iter()
1915 .map(DynamicToolSpec::as_value)
1916 .collect(),
1917 ),
1918 );
1919 }
1920 if let Some(enabled) = options.experimental_raw_events {
1921 extra.insert("experimentalRawEvents".to_string(), Value::Bool(enabled));
1922 }
1923 if let Some(enabled) = options.persist_extended_history {
1924 extra.insert("persistExtendedHistory".to_string(), Value::Bool(enabled));
1925 }
1926
1927 requests::ThreadStartParams {
1928 model: options.model.clone(),
1929 model_provider: options.model_provider.clone(),
1930 cwd: options.working_directory.clone(),
1931 approval_policy: options
1932 .approval_policy
1933 .map(|mode| mode.as_str().to_string()),
1934 sandbox: options.sandbox_mode.map(|mode| mode.as_str().to_string()),
1935 sandbox_policy: options.sandbox_policy.clone(),
1936 effort: options
1937 .model_reasoning_effort
1938 .map(|effort| effort.as_str().to_string()),
1939 summary: options
1940 .model_reasoning_summary
1941 .map(|summary| summary.as_str().to_string()),
1942 personality: options.personality.map(|value| value.as_str().to_string()),
1943 ephemeral: options.ephemeral,
1944 base_instructions: options.base_instructions.clone(),
1945 developer_instructions: options.developer_instructions.clone(),
1946 extra,
1947 }
1948}
1949
1950fn build_thread_resume_params(
1951 thread_id: &str,
1952 options: &ThreadOptions,
1953) -> requests::ThreadResumeParams {
1954 let mut extra = Map::new();
1955 if let Some(skip) = options.skip_git_repo_check {
1956 extra.insert("skipGitRepoCheck".to_string(), Value::Bool(skip));
1957 }
1958 if let Some(mode) = options.web_search_mode {
1959 extra.insert(
1960 "webSearchMode".to_string(),
1961 Value::String(mode.as_str().to_string()),
1962 );
1963 }
1964 if let Some(enabled) = options.web_search_enabled {
1965 extra.insert("webSearchEnabled".to_string(), Value::Bool(enabled));
1966 }
1967 if let Some(network) = options.network_access_enabled {
1968 extra.insert("networkAccessEnabled".to_string(), Value::Bool(network));
1969 }
1970 if let Some(additional) = &options.additional_directories {
1971 extra.insert(
1972 "additionalDirectories".to_string(),
1973 Value::Array(
1974 additional
1975 .iter()
1976 .map(|entry| Value::String(entry.clone()))
1977 .collect(),
1978 ),
1979 );
1980 }
1981 if let Some(policy) = &options.sandbox_policy {
1982 extra.insert("sandboxPolicy".to_string(), policy.clone());
1983 }
1984 if let Some(effort) = options.model_reasoning_effort {
1985 extra.insert(
1986 "effort".to_string(),
1987 Value::String(effort.as_str().to_string()),
1988 );
1989 }
1990 if let Some(summary) = options.model_reasoning_summary {
1991 extra.insert(
1992 "summary".to_string(),
1993 Value::String(summary.as_str().to_string()),
1994 );
1995 }
1996 if let Some(ephemeral) = options.ephemeral {
1997 extra.insert("ephemeral".to_string(), Value::Bool(ephemeral));
1998 }
1999 if let Some(collaboration_mode) = &options.collaboration_mode {
2000 extra.insert(
2001 "collaborationMode".to_string(),
2002 collaboration_mode.as_value(),
2003 );
2004 }
2005 if let Some(dynamic_tools) = &options.dynamic_tools {
2006 extra.insert(
2007 "dynamicTools".to_string(),
2008 Value::Array(
2009 dynamic_tools
2010 .iter()
2011 .map(DynamicToolSpec::as_value)
2012 .collect(),
2013 ),
2014 );
2015 }
2016 if let Some(enabled) = options.experimental_raw_events {
2017 extra.insert("experimentalRawEvents".to_string(), Value::Bool(enabled));
2018 }
2019
2020 requests::ThreadResumeParams {
2021 thread_id: thread_id.to_string(),
2022 history: None,
2023 path: None,
2024 model: options.model.clone(),
2025 model_provider: options.model_provider.clone(),
2026 cwd: options.working_directory.clone(),
2027 approval_policy: options
2028 .approval_policy
2029 .map(|mode| mode.as_str().to_string()),
2030 sandbox: options.sandbox_mode.map(|mode| mode.as_str().to_string()),
2031 config: options.config.clone(),
2032 base_instructions: options.base_instructions.clone(),
2033 developer_instructions: options.developer_instructions.clone(),
2034 personality: options.personality.map(|value| value.as_str().to_string()),
2035 persist_extended_history: options.persist_extended_history,
2036 extra,
2037 }
2038}
2039
2040fn build_turn_start_params(
2041 thread_id: &str,
2042 input: Input,
2043 options: &ThreadOptions,
2044 turn_options: &TurnOptions,
2045) -> requests::TurnStartParams {
2046 let mut extra = Map::new();
2047 if let Some(skip) = turn_options
2048 .skip_git_repo_check
2049 .or(options.skip_git_repo_check)
2050 {
2051 extra.insert("skipGitRepoCheck".to_string(), Value::Bool(skip));
2052 }
2053 if let Some(mode) = turn_options.web_search_mode.or(options.web_search_mode) {
2054 extra.insert(
2055 "webSearchMode".to_string(),
2056 Value::String(mode.as_str().to_string()),
2057 );
2058 }
2059 if let Some(enabled) = turn_options
2060 .web_search_enabled
2061 .or(options.web_search_enabled)
2062 {
2063 extra.insert("webSearchEnabled".to_string(), Value::Bool(enabled));
2064 }
2065 if let Some(network) = turn_options
2066 .network_access_enabled
2067 .or(options.network_access_enabled)
2068 {
2069 extra.insert("networkAccessEnabled".to_string(), Value::Bool(network));
2070 }
2071 if let Some(additional) = turn_options
2072 .additional_directories
2073 .as_ref()
2074 .or(options.additional_directories.as_ref())
2075 {
2076 extra.insert(
2077 "additionalDirectories".to_string(),
2078 Value::Array(
2079 additional
2080 .iter()
2081 .map(|entry| Value::String(entry.clone()))
2082 .collect(),
2083 ),
2084 );
2085 }
2086 if let Some(collaboration_mode) = turn_options
2087 .collaboration_mode
2088 .as_ref()
2089 .or(options.collaboration_mode.as_ref())
2090 {
2091 extra.insert(
2092 "collaborationMode".to_string(),
2093 collaboration_mode.as_value(),
2094 );
2095 }
2096 if let Some(extra_overrides) = &turn_options.extra {
2097 for (key, value) in extra_overrides {
2098 extra.insert(key.clone(), value.clone());
2099 }
2100 }
2101
2102 requests::TurnStartParams {
2103 thread_id: thread_id.to_string(),
2104 input: normalize_input(input),
2105 cwd: turn_options
2106 .working_directory
2107 .clone()
2108 .or_else(|| options.working_directory.clone()),
2109 model: turn_options.model.clone().or_else(|| options.model.clone()),
2110 model_provider: turn_options
2111 .model_provider
2112 .clone()
2113 .or_else(|| options.model_provider.clone()),
2114 effort: turn_options
2115 .model_reasoning_effort
2116 .or(options.model_reasoning_effort)
2117 .map(|effort| effort.as_str().to_string()),
2118 summary: turn_options
2119 .model_reasoning_summary
2120 .or(options.model_reasoning_summary)
2121 .map(|summary| summary.as_str().to_string()),
2122 personality: turn_options
2123 .personality
2124 .or(options.personality)
2125 .map(|value| value.as_str().to_string()),
2126 output_schema: turn_options.output_schema.clone(),
2127 approval_policy: turn_options
2128 .approval_policy
2129 .or(options.approval_policy)
2130 .map(|mode| mode.as_str().to_string()),
2131 sandbox_policy: turn_options
2132 .sandbox_policy
2133 .clone()
2134 .or_else(|| options.sandbox_policy.clone()),
2135 collaboration_mode: None,
2136 extra,
2137 }
2138}
2139
2140fn normalize_input(input: Input) -> Vec<requests::TurnInputItem> {
2141 match input {
2142 Input::Text(text) => vec![requests::TurnInputItem::Text { text }],
2143 Input::Items(items) => {
2144 let mut text_parts = Vec::new();
2145 let mut normalized = Vec::new();
2146
2147 for item in items {
2148 match item {
2149 UserInput::Text { text } => text_parts.push(text),
2150 UserInput::LocalImage { path } => {
2151 normalized.push(requests::TurnInputItem::LocalImage { path });
2152 }
2153 }
2154 }
2155
2156 if !text_parts.is_empty() {
2157 normalized.insert(
2158 0,
2159 requests::TurnInputItem::Text {
2160 text: text_parts.join("\n\n"),
2161 },
2162 );
2163 }
2164
2165 normalized
2166 }
2167 }
2168}
2169
2170fn matches_target_from_extra(
2171 extra: &Map<String, Value>,
2172 thread_id: &str,
2173 turn_id: Option<&str>,
2174) -> bool {
2175 let thread_matches = extra
2176 .get("threadId")
2177 .and_then(Value::as_str)
2178 .map(|incoming| incoming == thread_id)
2179 .unwrap_or(true);
2180
2181 let turn_matches = match turn_id {
2182 Some(target_turn_id) => extra
2183 .get("turnId")
2184 .and_then(Value::as_str)
2185 .map(|incoming| incoming == target_turn_id)
2186 .unwrap_or(true),
2187 None => true,
2188 };
2189
2190 thread_matches && turn_matches
2191}
2192
2193fn parse_usage_from_turn_extra(extra: &Map<String, Value>) -> Option<Usage> {
2194 extra
2195 .get("usage")
2196 .and_then(parse_usage_from_value)
2197 .or_else(|| extra.get("tokenUsage").and_then(parse_usage_from_value))
2198}
2199
2200fn parse_usage_from_value(value: &Value) -> Option<Usage> {
2201 let object = value.as_object()?;
2202 if let Some(last) = object.get("last") {
2203 return parse_usage_from_value(last);
2204 }
2205
2206 let input_tokens = object.get("inputTokens").and_then(Value::as_i64)?;
2207 let cached_input_tokens = object
2208 .get("cachedInputTokens")
2209 .and_then(Value::as_i64)
2210 .unwrap_or(0);
2211 let output_tokens = object.get("outputTokens").and_then(Value::as_i64)?;
2212
2213 Some(Usage {
2214 input_tokens,
2215 cached_input_tokens,
2216 output_tokens,
2217 })
2218}
2219
2220fn parse_thread_item(item: Value) -> ThreadItem {
2221 let object = match item.as_object() {
2222 Some(object) => object,
2223 None => {
2224 return ThreadItem::Unknown(UnknownItem {
2225 id: None,
2226 item_type: None,
2227 raw: item,
2228 });
2229 }
2230 };
2231
2232 let item_type = object
2233 .get("type")
2234 .and_then(Value::as_str)
2235 .map(|value| value.to_string());
2236 let id = object
2237 .get("id")
2238 .and_then(Value::as_str)
2239 .map(|value| value.to_string());
2240
2241 match item_type.as_deref() {
2242 Some("agentMessage") => ThreadItem::AgentMessage(AgentMessageItem {
2243 id: id.unwrap_or_default(),
2244 text: object
2245 .get("text")
2246 .and_then(Value::as_str)
2247 .unwrap_or_default()
2248 .to_string(),
2249 phase: object
2250 .get("phase")
2251 .and_then(Value::as_str)
2252 .map(parse_agent_message_phase),
2253 }),
2254 Some("userMessage") => ThreadItem::UserMessage(UserMessageItem {
2255 id: id.unwrap_or_default(),
2256 content: object
2257 .get("content")
2258 .and_then(Value::as_array)
2259 .map(|entries| {
2260 entries
2261 .iter()
2262 .map(parse_user_message_content_item)
2263 .collect()
2264 })
2265 .unwrap_or_default(),
2266 }),
2267 Some("plan") => ThreadItem::Plan(PlanItem {
2268 id: id.unwrap_or_default(),
2269 text: object
2270 .get("text")
2271 .and_then(Value::as_str)
2272 .unwrap_or_default()
2273 .to_string(),
2274 }),
2275 Some("reasoning") => ThreadItem::Reasoning(ReasoningItem {
2276 id: id.unwrap_or_default(),
2277 text: parse_reasoning_text(object),
2278 }),
2279 Some("commandExecution") => ThreadItem::CommandExecution(CommandExecutionItem {
2280 id: id.unwrap_or_default(),
2281 command: object
2282 .get("command")
2283 .and_then(Value::as_str)
2284 .unwrap_or_default()
2285 .to_string(),
2286 aggregated_output: object
2287 .get("aggregatedOutput")
2288 .and_then(Value::as_str)
2289 .unwrap_or_default()
2290 .to_string(),
2291 exit_code: object
2292 .get("exitCode")
2293 .and_then(Value::as_i64)
2294 .map(|value| value as i32),
2295 status: parse_command_execution_status(
2296 object
2297 .get("status")
2298 .and_then(Value::as_str)
2299 .unwrap_or_default(),
2300 ),
2301 }),
2302 Some("fileChange") => ThreadItem::FileChange(FileChangeItem {
2303 id: id.unwrap_or_default(),
2304 changes: object
2305 .get("changes")
2306 .and_then(Value::as_array)
2307 .map(|changes| {
2308 changes
2309 .iter()
2310 .filter_map(parse_file_update_change)
2311 .collect::<Vec<_>>()
2312 })
2313 .unwrap_or_default(),
2314 status: parse_patch_apply_status(
2315 object
2316 .get("status")
2317 .and_then(Value::as_str)
2318 .unwrap_or_default(),
2319 ),
2320 }),
2321 Some("mcpToolCall") => {
2322 let error = object
2323 .get("error")
2324 .and_then(Value::as_object)
2325 .and_then(|error| error.get("message"))
2326 .and_then(Value::as_str)
2327 .map(|message| ThreadError {
2328 message: message.to_string(),
2329 });
2330
2331 ThreadItem::McpToolCall(McpToolCallItem {
2332 id: id.unwrap_or_default(),
2333 server: object
2334 .get("server")
2335 .and_then(Value::as_str)
2336 .unwrap_or_default()
2337 .to_string(),
2338 tool: object
2339 .get("tool")
2340 .and_then(Value::as_str)
2341 .unwrap_or_default()
2342 .to_string(),
2343 arguments: object.get("arguments").cloned().unwrap_or(Value::Null),
2344 result: object.get("result").cloned(),
2345 error,
2346 status: parse_mcp_tool_call_status(
2347 object
2348 .get("status")
2349 .and_then(Value::as_str)
2350 .unwrap_or_default(),
2351 ),
2352 })
2353 }
2354 Some("dynamicToolCall") => ThreadItem::DynamicToolCall(DynamicToolCallItem {
2355 id: id.unwrap_or_default(),
2356 tool: object
2357 .get("tool")
2358 .and_then(Value::as_str)
2359 .unwrap_or_default()
2360 .to_string(),
2361 arguments: object.get("arguments").cloned().unwrap_or(Value::Null),
2362 status: object
2363 .get("status")
2364 .and_then(Value::as_str)
2365 .unwrap_or_default()
2366 .to_string(),
2367 content_items: object
2368 .get("contentItems")
2369 .and_then(Value::as_array)
2370 .cloned()
2371 .unwrap_or_default(),
2372 success: object.get("success").and_then(Value::as_bool),
2373 duration_ms: object.get("durationMs").and_then(Value::as_u64),
2374 }),
2375 Some("collabToolCall") => ThreadItem::CollabToolCall(CollabToolCallItem {
2376 id: id.unwrap_or_default(),
2377 tool: object
2378 .get("tool")
2379 .and_then(Value::as_str)
2380 .unwrap_or_default()
2381 .to_string(),
2382 status: object
2383 .get("status")
2384 .and_then(Value::as_str)
2385 .unwrap_or_default()
2386 .to_string(),
2387 sender_thread_id: object
2388 .get("senderThreadId")
2389 .and_then(Value::as_str)
2390 .unwrap_or_default()
2391 .to_string(),
2392 receiver_thread_id: object
2393 .get("receiverThreadId")
2394 .and_then(Value::as_str)
2395 .map(str::to_string),
2396 new_thread_id: object
2397 .get("newThreadId")
2398 .and_then(Value::as_str)
2399 .map(str::to_string),
2400 prompt: object
2401 .get("prompt")
2402 .and_then(Value::as_str)
2403 .map(str::to_string),
2404 agent_status: object
2405 .get("agentStatus")
2406 .and_then(Value::as_str)
2407 .map(str::to_string),
2408 }),
2409 Some("webSearch") => ThreadItem::WebSearch(WebSearchItem {
2410 id: id.unwrap_or_default(),
2411 query: object
2412 .get("query")
2413 .and_then(Value::as_str)
2414 .unwrap_or_default()
2415 .to_string(),
2416 }),
2417 Some("imageView") => ThreadItem::ImageView(ImageViewItem {
2418 id: id.unwrap_or_default(),
2419 path: object
2420 .get("path")
2421 .and_then(Value::as_str)
2422 .unwrap_or_default()
2423 .to_string(),
2424 }),
2425 Some("enteredReviewMode") => ThreadItem::EnteredReviewMode(ReviewModeItem {
2426 id: id.unwrap_or_default(),
2427 review: object
2428 .get("review")
2429 .and_then(Value::as_str)
2430 .unwrap_or_default()
2431 .to_string(),
2432 }),
2433 Some("exitedReviewMode") => ThreadItem::ExitedReviewMode(ReviewModeItem {
2434 id: id.unwrap_or_default(),
2435 review: object
2436 .get("review")
2437 .and_then(Value::as_str)
2438 .unwrap_or_default()
2439 .to_string(),
2440 }),
2441 Some("contextCompaction") => ThreadItem::ContextCompaction(ContextCompactionItem {
2442 id: id.unwrap_or_default(),
2443 }),
2444 Some("todoList") => ThreadItem::TodoList(TodoListItem {
2445 id: id.unwrap_or_default(),
2446 items: object
2447 .get("items")
2448 .and_then(Value::as_array)
2449 .map(|items| items.iter().filter_map(parse_todo_item).collect())
2450 .unwrap_or_default(),
2451 }),
2452 Some("error") => ThreadItem::Error(ErrorItem {
2453 id: id.unwrap_or_default(),
2454 message: object
2455 .get("message")
2456 .and_then(Value::as_str)
2457 .unwrap_or_default()
2458 .to_string(),
2459 }),
2460 _ => ThreadItem::Unknown(UnknownItem {
2461 id,
2462 item_type,
2463 raw: item,
2464 }),
2465 }
2466}
2467
2468fn parse_agent_message_phase(value: &str) -> AgentMessagePhase {
2469 match value {
2470 "commentary" => AgentMessagePhase::Commentary,
2471 "final_answer" => AgentMessagePhase::FinalAnswer,
2472 _ => AgentMessagePhase::Unknown,
2473 }
2474}
2475
2476fn parse_user_message_content_item(value: &Value) -> UserMessageContentItem {
2477 let Some(object) = value.as_object() else {
2478 return UserMessageContentItem::Unknown(value.clone());
2479 };
2480
2481 match object.get("type").and_then(Value::as_str) {
2482 Some("text") => UserMessageContentItem::Text {
2483 text: object
2484 .get("text")
2485 .and_then(Value::as_str)
2486 .unwrap_or_default()
2487 .to_string(),
2488 },
2489 Some("image") => UserMessageContentItem::Image {
2490 url: object
2491 .get("url")
2492 .and_then(Value::as_str)
2493 .unwrap_or_default()
2494 .to_string(),
2495 },
2496 Some("localImage") => UserMessageContentItem::LocalImage {
2497 path: object
2498 .get("path")
2499 .and_then(Value::as_str)
2500 .unwrap_or_default()
2501 .to_string(),
2502 },
2503 _ => UserMessageContentItem::Unknown(value.clone()),
2504 }
2505}
2506
2507fn parse_reasoning_text(object: &Map<String, Value>) -> String {
2508 if let Some(text) = object.get("text").and_then(Value::as_str) {
2509 return text.to_string();
2510 }
2511
2512 let summary = object
2513 .get("summary")
2514 .and_then(Value::as_array)
2515 .map(|entries| {
2516 entries
2517 .iter()
2518 .filter_map(Value::as_str)
2519 .collect::<Vec<_>>()
2520 .join("\n")
2521 })
2522 .unwrap_or_default();
2523
2524 let content = object
2525 .get("content")
2526 .and_then(Value::as_array)
2527 .map(|entries| {
2528 entries
2529 .iter()
2530 .filter_map(Value::as_str)
2531 .collect::<Vec<_>>()
2532 .join("\n")
2533 })
2534 .unwrap_or_default();
2535
2536 if summary.is_empty() {
2537 content
2538 } else if content.is_empty() {
2539 summary
2540 } else {
2541 format!("{summary}\n{content}")
2542 }
2543}
2544
2545fn parse_file_update_change(change: &Value) -> Option<FileUpdateChange> {
2546 let object = change.as_object()?;
2547 let kind = match object.get("kind") {
2548 Some(Value::String(kind)) => parse_patch_change_kind(kind),
2549 Some(Value::Object(kind_object)) => kind_object
2550 .get("type")
2551 .and_then(Value::as_str)
2552 .map(parse_patch_change_kind)
2553 .unwrap_or(PatchChangeKind::Unknown),
2554 _ => PatchChangeKind::Unknown,
2555 };
2556
2557 Some(FileUpdateChange {
2558 path: object
2559 .get("path")
2560 .and_then(Value::as_str)
2561 .unwrap_or_default()
2562 .to_string(),
2563 kind,
2564 })
2565}
2566
2567fn parse_todo_item(value: &Value) -> Option<TodoItem> {
2568 let object = value.as_object()?;
2569 Some(TodoItem {
2570 text: object
2571 .get("text")
2572 .and_then(Value::as_str)
2573 .unwrap_or_default()
2574 .to_string(),
2575 completed: object
2576 .get("completed")
2577 .and_then(Value::as_bool)
2578 .unwrap_or(false),
2579 })
2580}
2581
2582fn parse_command_execution_status(status: &str) -> CommandExecutionStatus {
2583 match status {
2584 "inProgress" | "in_progress" => CommandExecutionStatus::InProgress,
2585 "completed" => CommandExecutionStatus::Completed,
2586 "failed" => CommandExecutionStatus::Failed,
2587 "declined" => CommandExecutionStatus::Declined,
2588 _ => CommandExecutionStatus::Unknown,
2589 }
2590}
2591
2592fn parse_patch_change_kind(kind: &str) -> PatchChangeKind {
2593 match kind {
2594 "add" => PatchChangeKind::Add,
2595 "delete" => PatchChangeKind::Delete,
2596 "update" => PatchChangeKind::Update,
2597 _ => PatchChangeKind::Unknown,
2598 }
2599}
2600
2601fn parse_patch_apply_status(status: &str) -> PatchApplyStatus {
2602 match status {
2603 "inProgress" | "in_progress" => PatchApplyStatus::InProgress,
2604 "completed" => PatchApplyStatus::Completed,
2605 "failed" => PatchApplyStatus::Failed,
2606 "declined" => PatchApplyStatus::Declined,
2607 _ => PatchApplyStatus::Unknown,
2608 }
2609}
2610
2611fn parse_mcp_tool_call_status(status: &str) -> McpToolCallStatus {
2612 match status {
2613 "inProgress" | "in_progress" => McpToolCallStatus::InProgress,
2614 "completed" => McpToolCallStatus::Completed,
2615 "failed" => McpToolCallStatus::Failed,
2616 _ => McpToolCallStatus::Unknown,
2617 }
2618}
2619
2620#[cfg(test)]
2621mod tests {
2622 use super::*;
2623 use schemars::JsonSchema;
2624 use serde::{Deserialize, Serialize};
2625 use serde_json::json;
2626
2627 #[derive(
2628 Debug,
2629 Clone,
2630 PartialEq,
2631 Eq,
2632 Serialize,
2633 Deserialize,
2634 JsonSchema,
2635 codex_app_server_sdk::OpenAiSerializable,
2636 )]
2637 struct StructuredReply {
2638 answer: String,
2639 }
2640
2641 fn thread_summary(
2642 id: &str,
2643 cwd: Option<&str>,
2644 updated_at: Option<i64>,
2645 created_at: Option<i64>,
2646 ) -> responses::ThreadSummary {
2647 let mut extra = Map::new();
2648 if let Some(cwd) = cwd {
2649 extra.insert("cwd".to_string(), Value::String(cwd.to_string()));
2650 }
2651 if let Some(updated_at) = updated_at {
2652 extra.insert("updatedAt".to_string(), Value::from(updated_at));
2653 }
2654 if let Some(created_at) = created_at {
2655 extra.insert("createdAt".to_string(), Value::from(created_at));
2656 }
2657
2658 responses::ThreadSummary {
2659 id: id.to_string(),
2660 extra,
2661 ..Default::default()
2662 }
2663 }
2664
2665 #[test]
2666 fn normalize_input_combines_text_and_images() {
2667 let normalized = normalize_input(Input::Items(vec![
2668 UserInput::Text {
2669 text: "first".to_string(),
2670 },
2671 UserInput::Text {
2672 text: "second".to_string(),
2673 },
2674 UserInput::LocalImage {
2675 path: "/tmp/one.png".to_string(),
2676 },
2677 UserInput::LocalImage {
2678 path: "/tmp/two.png".to_string(),
2679 },
2680 ]));
2681
2682 assert_eq!(normalized.len(), 3);
2683 match &normalized[0] {
2684 requests::TurnInputItem::Text { text } => assert_eq!(text, "first\n\nsecond"),
2685 other => panic!("expected text input item, got {other:?}"),
2686 }
2687 match &normalized[1] {
2688 requests::TurnInputItem::LocalImage { path } => assert_eq!(path, "/tmp/one.png"),
2689 other => panic!("expected local image input item, got {other:?}"),
2690 }
2691 match &normalized[2] {
2692 requests::TurnInputItem::LocalImage { path } => assert_eq!(path, "/tmp/two.png"),
2693 other => panic!("expected local image input item, got {other:?}"),
2694 }
2695 }
2696
2697 #[test]
2698 fn parse_agent_message_item() {
2699 let item = parse_thread_item(json!({
2700 "id": "item_1",
2701 "type": "agentMessage",
2702 "text": "hello",
2703 "phase": "final_answer"
2704 }));
2705
2706 assert_eq!(
2707 item,
2708 ThreadItem::AgentMessage(AgentMessageItem {
2709 id: "item_1".to_string(),
2710 text: "hello".to_string(),
2711 phase: Some(AgentMessagePhase::FinalAnswer),
2712 })
2713 );
2714 }
2715
2716 #[test]
2717 fn parse_missing_documented_thread_item_variants() {
2718 let cases = vec![
2719 (
2720 json!({
2721 "id": "user_1",
2722 "type": "userMessage",
2723 "content": [
2724 { "type": "text", "text": "hello" },
2725 { "type": "localImage", "path": "/tmp/example.png" }
2726 ]
2727 }),
2728 ThreadItem::UserMessage(UserMessageItem {
2729 id: "user_1".to_string(),
2730 content: vec![
2731 UserMessageContentItem::Text {
2732 text: "hello".to_string(),
2733 },
2734 UserMessageContentItem::LocalImage {
2735 path: "/tmp/example.png".to_string(),
2736 },
2737 ],
2738 }),
2739 ),
2740 (
2741 json!({
2742 "id": "plan_1",
2743 "type": "plan",
2744 "text": "1. inspect\n2. patch"
2745 }),
2746 ThreadItem::Plan(PlanItem {
2747 id: "plan_1".to_string(),
2748 text: "1. inspect\n2. patch".to_string(),
2749 }),
2750 ),
2751 (
2752 json!({
2753 "id": "tool_1",
2754 "type": "dynamicToolCall",
2755 "tool": "tool/search",
2756 "arguments": { "q": "rust" },
2757 "status": "completed",
2758 "contentItems": [{ "type": "text", "text": "done" }],
2759 "success": true,
2760 "durationMs": 12
2761 }),
2762 ThreadItem::DynamicToolCall(DynamicToolCallItem {
2763 id: "tool_1".to_string(),
2764 tool: "tool/search".to_string(),
2765 arguments: json!({ "q": "rust" }),
2766 status: "completed".to_string(),
2767 content_items: vec![json!({ "type": "text", "text": "done" })],
2768 success: Some(true),
2769 duration_ms: Some(12),
2770 }),
2771 ),
2772 (
2773 json!({
2774 "id": "collab_1",
2775 "type": "collabToolCall",
2776 "tool": "delegate",
2777 "status": "completed",
2778 "senderThreadId": "thr_a",
2779 "receiverThreadId": "thr_b",
2780 "newThreadId": "thr_c",
2781 "prompt": "review this",
2782 "agentStatus": "idle"
2783 }),
2784 ThreadItem::CollabToolCall(CollabToolCallItem {
2785 id: "collab_1".to_string(),
2786 tool: "delegate".to_string(),
2787 status: "completed".to_string(),
2788 sender_thread_id: "thr_a".to_string(),
2789 receiver_thread_id: Some("thr_b".to_string()),
2790 new_thread_id: Some("thr_c".to_string()),
2791 prompt: Some("review this".to_string()),
2792 agent_status: Some("idle".to_string()),
2793 }),
2794 ),
2795 (
2796 json!({
2797 "id": "image_1",
2798 "type": "imageView",
2799 "path": "/tmp/example.jpg"
2800 }),
2801 ThreadItem::ImageView(ImageViewItem {
2802 id: "image_1".to_string(),
2803 path: "/tmp/example.jpg".to_string(),
2804 }),
2805 ),
2806 (
2807 json!({
2808 "id": "review_1",
2809 "type": "enteredReviewMode",
2810 "review": "current changes"
2811 }),
2812 ThreadItem::EnteredReviewMode(ReviewModeItem {
2813 id: "review_1".to_string(),
2814 review: "current changes".to_string(),
2815 }),
2816 ),
2817 (
2818 json!({
2819 "id": "review_2",
2820 "type": "exitedReviewMode",
2821 "review": "looks good"
2822 }),
2823 ThreadItem::ExitedReviewMode(ReviewModeItem {
2824 id: "review_2".to_string(),
2825 review: "looks good".to_string(),
2826 }),
2827 ),
2828 (
2829 json!({
2830 "id": "compact_1",
2831 "type": "contextCompaction"
2832 }),
2833 ThreadItem::ContextCompaction(ContextCompactionItem {
2834 id: "compact_1".to_string(),
2835 }),
2836 ),
2837 ];
2838
2839 for (raw, expected) in cases {
2840 assert_eq!(parse_thread_item(raw), expected);
2841 }
2842 }
2843
2844 #[test]
2845 fn final_response_prefers_final_answer_phase() {
2846 let mut final_answer = None;
2847 let mut fallback_response = None;
2848
2849 update_final_response_candidates(
2850 &ThreadItem::AgentMessage(AgentMessageItem {
2851 id: "msg_1".to_string(),
2852 text: "thinking".to_string(),
2853 phase: Some(AgentMessagePhase::Commentary),
2854 }),
2855 &mut final_answer,
2856 &mut fallback_response,
2857 );
2858 update_final_response_candidates(
2859 &ThreadItem::AgentMessage(AgentMessageItem {
2860 id: "msg_2".to_string(),
2861 text: "final".to_string(),
2862 phase: Some(AgentMessagePhase::FinalAnswer),
2863 }),
2864 &mut final_answer,
2865 &mut fallback_response,
2866 );
2867
2868 assert_eq!(fallback_response.as_deref(), Some("thinking"));
2869 assert_eq!(final_answer.as_deref(), Some("final"));
2870 assert_eq!(
2871 final_answer.or(fallback_response),
2872 Some("final".to_string())
2873 );
2874 }
2875
2876 #[test]
2877 fn parse_usage_from_token_usage_payload() {
2878 let usage = parse_usage_from_value(&json!({
2879 "last": {
2880 "inputTokens": 10,
2881 "cachedInputTokens": 2,
2882 "outputTokens": 7
2883 }
2884 }));
2885
2886 assert_eq!(
2887 usage,
2888 Some(Usage {
2889 input_tokens: 10,
2890 cached_input_tokens: 2,
2891 output_tokens: 7,
2892 })
2893 );
2894 }
2895
2896 #[test]
2897 fn parse_unknown_item_preserves_payload() {
2898 let raw = json!({
2899 "id": "x",
2900 "type": "newItemType",
2901 "payload": {"a": 1}
2902 });
2903 let parsed = parse_thread_item(raw.clone());
2904
2905 match parsed {
2906 ThreadItem::Unknown(unknown) => {
2907 assert_eq!(unknown.id.as_deref(), Some("x"));
2908 assert_eq!(unknown.item_type.as_deref(), Some("newItemType"));
2909 assert_eq!(unknown.raw, raw);
2910 }
2911 other => panic!("expected unknown item, got {other:?}"),
2912 }
2913 }
2914
2915 #[test]
2916 fn select_latest_thread_id_prefers_newest_matching_working_directory() {
2917 let threads = vec![
2918 thread_summary("thread_other", Some("/tmp/other"), Some(300), None),
2919 thread_summary("thread_old", Some("/tmp/workspace"), None, Some(100)),
2920 thread_summary("thread_new", Some("/tmp/workspace"), Some(200), None),
2921 ];
2922
2923 assert_eq!(
2924 select_latest_thread_id(&threads, Some("/tmp/workspace")),
2925 Some("thread_new".to_string())
2926 );
2927 }
2928
2929 #[test]
2930 fn select_latest_thread_id_falls_back_to_first_matching_thread_without_timestamps() {
2931 let threads = vec![
2932 thread_summary("thread_other", Some("/tmp/other"), None, None),
2933 thread_summary("thread_match", Some("/tmp/workspace"), None, None),
2934 thread_summary("thread_match_two", Some("/tmp/workspace"), None, None),
2935 ];
2936
2937 assert_eq!(
2938 select_latest_thread_id(&threads, Some("/tmp/workspace")),
2939 Some("thread_match".to_string())
2940 );
2941 }
2942
2943 #[test]
2944 fn select_latest_thread_id_keeps_newest_listed_match_when_later_entry_only_has_timestamp() {
2945 let threads = vec![
2946 thread_summary("thread_newest", Some("/tmp/workspace"), None, None),
2947 thread_summary("thread_older", Some("/tmp/workspace"), Some(100), None),
2948 ];
2949
2950 assert_eq!(
2951 select_latest_thread_id(&threads, Some("/tmp/workspace")),
2952 Some("thread_newest".to_string())
2953 );
2954 }
2955
2956 #[test]
2957 fn thread_options_builder_maps_extended_protocol_fields() {
2958 let collaboration_mode = CollaborationMode::new(
2959 CollaborationModeKind::Default,
2960 CollaborationModeSettings::new("gpt-5.2-codex")
2961 .with_reasoning_effort(ModelReasoningEffort::High),
2962 );
2963
2964 let options = ThreadOptions::builder()
2965 .model("gpt-5.2-codex")
2966 .model_provider("mock_provider")
2967 .sandbox_mode(SandboxMode::WorkspaceWrite)
2968 .sandbox_policy(json!({"type": "dangerFullAccess"}))
2969 .working_directory("/tmp/workspace")
2970 .skip_git_repo_check(true)
2971 .model_reasoning_effort(ModelReasoningEffort::None)
2972 .model_reasoning_summary(ModelReasoningSummary::Auto)
2973 .network_access_enabled(true)
2974 .web_search_mode(WebSearchMode::Live)
2975 .web_search_enabled(false)
2976 .approval_policy(ApprovalMode::OnRequest)
2977 .add_directory("/tmp/one")
2978 .add_directory("/tmp/two")
2979 .personality(Personality::Pragmatic)
2980 .base_instructions("base instructions")
2981 .developer_instructions("developer instructions")
2982 .ephemeral(true)
2983 .insert_config("sandbox_workspace_write.network_access", Value::Bool(true))
2984 .dynamic_tools(vec![DynamicToolSpec::new(
2985 "demo_tool",
2986 "Demo dynamic tool",
2987 json!({"type": "object"}),
2988 )])
2989 .experimental_raw_events(true)
2990 .persist_extended_history(true)
2991 .collaboration_mode(collaboration_mode)
2992 .build();
2993
2994 let thread_params = build_thread_start_params(&options);
2995 assert_eq!(thread_params.model.as_deref(), Some("gpt-5.2-codex"));
2996 assert_eq!(
2997 thread_params.model_provider.as_deref(),
2998 Some("mock_provider")
2999 );
3000 assert_eq!(thread_params.cwd.as_deref(), Some("/tmp/workspace"));
3001 assert_eq!(thread_params.approval_policy.as_deref(), Some("on-request"));
3002 assert_eq!(thread_params.sandbox.as_deref(), Some("workspace-write"));
3003 assert_eq!(
3004 thread_params.sandbox_policy,
3005 Some(json!({"type": "dangerFullAccess"}))
3006 );
3007 assert_eq!(thread_params.effort.as_deref(), Some("none"));
3008 assert_eq!(thread_params.summary.as_deref(), Some("auto"));
3009 assert_eq!(thread_params.personality.as_deref(), Some("pragmatic"));
3010 assert_eq!(thread_params.ephemeral, Some(true));
3011 assert_eq!(
3012 thread_params.base_instructions.as_deref(),
3013 Some("base instructions")
3014 );
3015 assert_eq!(
3016 thread_params.developer_instructions.as_deref(),
3017 Some("developer instructions")
3018 );
3019 assert_eq!(
3020 thread_params.extra.get("skipGitRepoCheck"),
3021 Some(&Value::Bool(true))
3022 );
3023 assert_eq!(
3024 thread_params.extra.get("webSearchMode"),
3025 Some(&Value::String("live".to_string()))
3026 );
3027 assert_eq!(
3028 thread_params.extra.get("webSearchEnabled"),
3029 Some(&Value::Bool(false))
3030 );
3031 assert_eq!(
3032 thread_params.extra.get("networkAccessEnabled"),
3033 Some(&Value::Bool(true))
3034 );
3035 assert_eq!(
3036 thread_params.extra.get("additionalDirectories"),
3037 Some(&json!(["/tmp/one", "/tmp/two"]))
3038 );
3039 assert_eq!(
3040 thread_params.extra.get("config"),
3041 Some(&json!({"sandbox_workspace_write.network_access": true}))
3042 );
3043 assert_eq!(
3044 thread_params.extra.get("dynamicTools"),
3045 Some(&json!([{
3046 "name": "demo_tool",
3047 "description": "Demo dynamic tool",
3048 "inputSchema": {"type": "object"}
3049 }]))
3050 );
3051 assert_eq!(
3052 thread_params.extra.get("experimentalRawEvents"),
3053 Some(&Value::Bool(true))
3054 );
3055 assert_eq!(
3056 thread_params.extra.get("persistExtendedHistory"),
3057 Some(&Value::Bool(true))
3058 );
3059
3060 let resume_params = build_thread_resume_params("thread_123", &options);
3061 assert_eq!(resume_params.thread_id, "thread_123");
3062 assert_eq!(resume_params.model.as_deref(), Some("gpt-5.2-codex"));
3063 assert_eq!(
3064 resume_params.model_provider.as_deref(),
3065 Some("mock_provider")
3066 );
3067 assert_eq!(resume_params.cwd.as_deref(), Some("/tmp/workspace"));
3068 assert_eq!(resume_params.approval_policy.as_deref(), Some("on-request"));
3069 assert_eq!(resume_params.sandbox.as_deref(), Some("workspace-write"));
3070 assert_eq!(resume_params.personality.as_deref(), Some("pragmatic"));
3071 assert_eq!(
3072 resume_params
3073 .config
3074 .as_ref()
3075 .and_then(|config| config.get("sandbox_workspace_write.network_access")),
3076 Some(&Value::Bool(true))
3077 );
3078 assert_eq!(resume_params.persist_extended_history, Some(true));
3079 assert_eq!(
3080 resume_params.extra.get("sandboxPolicy"),
3081 Some(&json!({"type": "dangerFullAccess"}))
3082 );
3083 assert_eq!(
3084 resume_params.extra.get("effort"),
3085 Some(&Value::String("none".to_string()))
3086 );
3087 assert_eq!(
3088 resume_params.extra.get("summary"),
3089 Some(&Value::String("auto".to_string()))
3090 );
3091 assert_eq!(
3092 resume_params.extra.get("experimentalRawEvents"),
3093 Some(&Value::Bool(true))
3094 );
3095
3096 let turn_params = build_turn_start_params(
3097 "thread_123",
3098 Input::text("hello"),
3099 &options,
3100 &TurnOptions::default(),
3101 );
3102 assert_eq!(turn_params.model_provider.as_deref(), Some("mock_provider"));
3103 assert_eq!(turn_params.effort.as_deref(), Some("none"));
3104 assert_eq!(turn_params.summary.as_deref(), Some("auto"));
3105 assert_eq!(turn_params.personality.as_deref(), Some("pragmatic"));
3106 assert_eq!(
3107 turn_params.sandbox_policy,
3108 Some(json!({"type": "dangerFullAccess"}))
3109 );
3110 assert_eq!(
3111 turn_params.extra.get("collaborationMode"),
3112 Some(&json!({
3113 "mode": "default",
3114 "settings": {
3115 "model": "gpt-5.2-codex",
3116 "reasoning_effort": "high"
3117 }
3118 }))
3119 );
3120 }
3121
3122 #[test]
3123 fn thread_options_builder_skip_git_repo_check_matches_cli_flag_semantics() {
3124 let enabled = ThreadOptions::builder().skip_git_repo_check(true).build();
3125 let enabled_params = build_thread_start_params(&enabled);
3126 assert_eq!(
3127 enabled_params.extra.get("skipGitRepoCheck"),
3128 Some(&Value::Bool(true))
3129 );
3130
3131 let disabled = ThreadOptions::builder().skip_git_repo_check(false).build();
3132 let disabled_params = build_thread_start_params(&disabled);
3133 assert_eq!(
3134 disabled_params.extra.get("skipGitRepoCheck"),
3135 Some(&Value::Bool(false))
3136 );
3137 }
3138
3139 #[test]
3140 fn turn_options_builder_sets_typed_output_schema() {
3141 let turn_options = TurnOptions::builder()
3142 .output_schema_for::<StructuredReply>()
3143 .build();
3144 let turn_params = build_turn_start_params(
3145 "thread_123",
3146 Input::text("hello"),
3147 &ThreadOptions::default(),
3148 &turn_options,
3149 );
3150
3151 assert_eq!(
3152 turn_params.output_schema,
3153 Some(StructuredReply::openai_output_schema())
3154 );
3155 }
3156
3157 #[test]
3158 fn turn_options_builder_clear_output_schema_overrides_previous_value() {
3159 let turn_options = TurnOptions::builder()
3160 .output_schema(json!({"type": "object"}))
3161 .clear_output_schema()
3162 .build();
3163 let turn_params = build_turn_start_params(
3164 "thread_123",
3165 Input::text("hello"),
3166 &ThreadOptions::default(),
3167 &turn_options,
3168 );
3169
3170 assert_eq!(turn_params.output_schema, None);
3171 }
3172
3173 #[test]
3174 fn turn_options_value_helpers_set_raw_and_typed_schemas() {
3175 let raw = TurnOptions::default().with_output_schema(json!({"type": "object"}));
3176 assert_eq!(raw.output_schema, Some(json!({"type": "object"})));
3177
3178 let typed = TurnOptions::default().with_output_schema_for::<StructuredReply>();
3179 assert_eq!(
3180 typed.output_schema,
3181 Some(StructuredReply::openai_output_schema())
3182 );
3183 }
3184
3185 #[test]
3186 fn turn_options_builder_overrides_thread_defaults() {
3187 let thread_options = ThreadOptions::builder()
3188 .model("gpt-5-thread-default")
3189 .model_provider("provider-thread")
3190 .working_directory("/tmp/thread")
3191 .model_reasoning_effort(ModelReasoningEffort::Low)
3192 .model_reasoning_summary(ModelReasoningSummary::Auto)
3193 .personality(Personality::Friendly)
3194 .approval_policy(ApprovalMode::OnRequest)
3195 .sandbox_policy(json!({"thread": true}))
3196 .skip_git_repo_check(false)
3197 .network_access_enabled(false)
3198 .web_search_mode(WebSearchMode::Cached)
3199 .web_search_enabled(false)
3200 .add_directory("/tmp/thread-dir")
3201 .build();
3202
3203 let turn_options = TurnOptions::builder()
3204 .model("gpt-5-turn-override")
3205 .model_provider("provider-turn")
3206 .working_directory("/tmp/turn")
3207 .model_reasoning_effort(ModelReasoningEffort::High)
3208 .model_reasoning_summary(ModelReasoningSummary::Detailed)
3209 .personality(Personality::Pragmatic)
3210 .approval_policy(ApprovalMode::Never)
3211 .sandbox_policy(json!({"turn": true}))
3212 .skip_git_repo_check(true)
3213 .network_access_enabled(true)
3214 .web_search_mode(WebSearchMode::Live)
3215 .web_search_enabled(true)
3216 .add_directory("/tmp/turn-dir")
3217 .insert_extra("customTurnFlag", Value::Bool(true))
3218 .build();
3219
3220 let params = build_turn_start_params(
3221 "thread_123",
3222 Input::text("hello"),
3223 &thread_options,
3224 &turn_options,
3225 );
3226
3227 assert_eq!(params.cwd.as_deref(), Some("/tmp/turn"));
3228 assert_eq!(params.model.as_deref(), Some("gpt-5-turn-override"));
3229 assert_eq!(params.model_provider.as_deref(), Some("provider-turn"));
3230 assert_eq!(params.effort.as_deref(), Some("high"));
3231 assert_eq!(params.summary.as_deref(), Some("detailed"));
3232 assert_eq!(params.personality.as_deref(), Some("pragmatic"));
3233 assert_eq!(params.approval_policy.as_deref(), Some("never"));
3234 assert_eq!(params.sandbox_policy, Some(json!({"turn": true})));
3235 assert_eq!(
3236 params.extra.get("skipGitRepoCheck"),
3237 Some(&Value::Bool(true))
3238 );
3239 assert_eq!(
3240 params.extra.get("networkAccessEnabled"),
3241 Some(&Value::Bool(true))
3242 );
3243 assert_eq!(
3244 params.extra.get("webSearchMode"),
3245 Some(&Value::String("live".to_string()))
3246 );
3247 assert_eq!(
3248 params.extra.get("webSearchEnabled"),
3249 Some(&Value::Bool(true))
3250 );
3251 assert_eq!(
3252 params.extra.get("additionalDirectories"),
3253 Some(&json!(["/tmp/turn-dir"]))
3254 );
3255 assert_eq!(params.extra.get("customTurnFlag"), Some(&Value::Bool(true)));
3256 }
3257}