1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3
4use serde_json::{Map, Value};
5use thiserror::Error;
6use tokio::sync::{Mutex, mpsc};
7use tokio::task::JoinHandle;
8
9use crate::CodexClient;
10use crate::client::StdioConfig;
11use crate::client::WsConfig;
12use crate::client::{WsServerHandle, WsStartConfig};
13use crate::error::ClientError;
14use crate::events::{ServerEvent, ServerNotification};
15use crate::protocol::shared::EmptyObject;
16use crate::protocol::{requests, responses};
17use crate::schema::OpenAiSerializable;
18
19const THREAD_LIST_PAGE_LIMIT: u32 = 100;
20const MAX_THREAD_LIST_PAGES: usize = 100;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum ApprovalMode {
24 Never,
25 OnRequest,
26 OnFailure,
27 Untrusted,
28}
29
30impl ApprovalMode {
31 fn as_str(self) -> &'static str {
32 match self {
33 Self::Never => "never",
34 Self::OnRequest => "on-request",
35 Self::OnFailure => "on-failure",
36 Self::Untrusted => "untrusted",
37 }
38 }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum SandboxMode {
43 ReadOnly,
44 WorkspaceWrite,
45 DangerFullAccess,
46}
47
48impl SandboxMode {
49 fn as_str(self) -> &'static str {
50 match self {
51 Self::ReadOnly => "read-only",
52 Self::WorkspaceWrite => "workspace-write",
53 Self::DangerFullAccess => "danger-full-access",
54 }
55 }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum ModelReasoningEffort {
60 None,
61 Minimal,
62 Low,
63 Medium,
64 High,
65 XHigh,
66}
67
68impl ModelReasoningEffort {
69 fn as_str(self) -> &'static str {
70 match self {
71 Self::None => "none",
72 Self::Minimal => "minimal",
73 Self::Low => "low",
74 Self::Medium => "medium",
75 Self::High => "high",
76 Self::XHigh => "xhigh",
77 }
78 }
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum ModelReasoningSummary {
83 None,
84 Auto,
85 Concise,
86 Detailed,
87}
88
89impl ModelReasoningSummary {
90 fn as_str(self) -> &'static str {
91 match self {
92 Self::None => "none",
93 Self::Auto => "auto",
94 Self::Concise => "concise",
95 Self::Detailed => "detailed",
96 }
97 }
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum Personality {
102 None,
103 Friendly,
104 Pragmatic,
105}
106
107impl Personality {
108 fn as_str(self) -> &'static str {
109 match self {
110 Self::None => "none",
111 Self::Friendly => "friendly",
112 Self::Pragmatic => "pragmatic",
113 }
114 }
115}
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub enum WebSearchMode {
119 Disabled,
120 Cached,
121 Live,
122}
123
124impl WebSearchMode {
125 fn as_str(self) -> &'static str {
126 match self {
127 Self::Disabled => "disabled",
128 Self::Cached => "cached",
129 Self::Live => "live",
130 }
131 }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub enum CollaborationModeKind {
136 Plan,
137 Default,
138}
139
140impl CollaborationModeKind {
141 fn as_str(self) -> &'static str {
142 match self {
143 Self::Plan => "plan",
144 Self::Default => "default",
145 }
146 }
147}
148
149#[derive(Debug, Clone, PartialEq, Eq)]
150pub struct CollaborationModeSettings {
151 pub model: String,
152 pub reasoning_effort: Option<ModelReasoningEffort>,
153 pub developer_instructions: Option<String>,
154}
155
156impl CollaborationModeSettings {
157 pub fn new(model: impl Into<String>) -> Self {
158 Self {
159 model: model.into(),
160 reasoning_effort: None,
161 developer_instructions: None,
162 }
163 }
164
165 pub fn with_reasoning_effort(mut self, reasoning_effort: ModelReasoningEffort) -> Self {
166 self.reasoning_effort = Some(reasoning_effort);
167 self
168 }
169
170 pub fn with_developer_instructions(
171 mut self,
172 developer_instructions: impl Into<String>,
173 ) -> Self {
174 self.developer_instructions = Some(developer_instructions.into());
175 self
176 }
177}
178
179#[derive(Debug, Clone, PartialEq, Eq)]
180pub struct CollaborationMode {
181 pub mode: CollaborationModeKind,
182 pub settings: CollaborationModeSettings,
183}
184
185impl CollaborationMode {
186 pub fn new(mode: CollaborationModeKind, settings: CollaborationModeSettings) -> Self {
187 Self { mode, settings }
188 }
189
190 fn as_value(&self) -> Value {
191 let mut settings = Map::new();
192 settings.insert(
193 "model".to_string(),
194 Value::String(self.settings.model.clone()),
195 );
196 if let Some(reasoning_effort) = self.settings.reasoning_effort {
197 settings.insert(
198 "reasoning_effort".to_string(),
199 Value::String(reasoning_effort.as_str().to_string()),
200 );
201 }
202 if let Some(instructions) = &self.settings.developer_instructions {
203 settings.insert(
204 "developer_instructions".to_string(),
205 Value::String(instructions.clone()),
206 );
207 }
208
209 let mut value = Map::new();
210 value.insert(
211 "mode".to_string(),
212 Value::String(self.mode.as_str().to_string()),
213 );
214 value.insert("settings".to_string(), Value::Object(settings));
215 Value::Object(value)
216 }
217}
218
219#[derive(Debug, Clone, PartialEq)]
220pub struct DynamicToolSpec {
221 pub name: String,
222 pub description: String,
223 pub input_schema: Value,
224}
225
226impl DynamicToolSpec {
227 pub fn new(
228 name: impl Into<String>,
229 description: impl Into<String>,
230 input_schema: Value,
231 ) -> Self {
232 Self {
233 name: name.into(),
234 description: description.into(),
235 input_schema,
236 }
237 }
238
239 fn as_value(&self) -> Value {
240 let mut value = Map::new();
241 value.insert("name".to_string(), Value::String(self.name.clone()));
242 value.insert(
243 "description".to_string(),
244 Value::String(self.description.clone()),
245 );
246 value.insert("inputSchema".to_string(), self.input_schema.clone());
247 Value::Object(value)
248 }
249}
250
251#[derive(Debug, Clone, PartialEq, Eq)]
253pub enum ResumeThread {
254 Latest,
255 ById(String),
256}
257
258impl From<String> for ResumeThread {
259 fn from(value: String) -> Self {
260 Self::ById(value)
261 }
262}
263
264impl From<&str> for ResumeThread {
265 fn from(value: &str) -> Self {
266 Self::ById(value.to_string())
267 }
268}
269
270impl From<&String> for ResumeThread {
271 fn from(value: &String) -> Self {
272 Self::ById(value.clone())
273 }
274}
275
276#[derive(Debug, Clone, Default)]
277pub struct ThreadOptions {
278 pub model: Option<String>,
279 pub model_provider: Option<String>,
280 pub sandbox_mode: Option<SandboxMode>,
281 pub sandbox_policy: Option<Value>,
282 pub working_directory: Option<String>,
283 pub skip_git_repo_check: Option<bool>,
284 pub model_reasoning_effort: Option<ModelReasoningEffort>,
285 pub model_reasoning_summary: Option<ModelReasoningSummary>,
286 pub network_access_enabled: Option<bool>,
287 pub web_search_mode: Option<WebSearchMode>,
288 pub web_search_enabled: Option<bool>,
289 pub approval_policy: Option<ApprovalMode>,
290 pub additional_directories: Option<Vec<String>>,
291 pub personality: Option<Personality>,
292 pub base_instructions: Option<String>,
293 pub developer_instructions: Option<String>,
294 pub ephemeral: Option<bool>,
295 pub collaboration_mode: Option<CollaborationMode>,
296 pub config: Option<Map<String, Value>>,
297 pub dynamic_tools: Option<Vec<DynamicToolSpec>>,
298 pub experimental_raw_events: Option<bool>,
299 pub persist_extended_history: Option<bool>,
300}
301
302impl ThreadOptions {
303 pub fn builder() -> ThreadOptionsBuilder {
304 ThreadOptionsBuilder::new()
305 }
306}
307
308#[derive(Debug, Clone, Default)]
309pub struct ThreadOptionsBuilder {
310 options: ThreadOptions,
311}
312
313impl ThreadOptionsBuilder {
314 pub fn new() -> Self {
315 Self::default()
316 }
317
318 pub fn build(self) -> ThreadOptions {
319 self.options
320 }
321
322 pub fn model(mut self, model: impl Into<String>) -> Self {
323 self.options.model = Some(model.into());
324 self
325 }
326
327 pub fn model_provider(mut self, model_provider: impl Into<String>) -> Self {
328 self.options.model_provider = Some(model_provider.into());
329 self
330 }
331
332 pub fn sandbox_mode(mut self, sandbox_mode: SandboxMode) -> Self {
333 self.options.sandbox_mode = Some(sandbox_mode);
334 self
335 }
336
337 pub fn sandbox_policy(mut self, sandbox_policy: Value) -> Self {
338 self.options.sandbox_policy = Some(sandbox_policy);
339 self
340 }
341
342 pub fn working_directory(mut self, working_directory: impl Into<String>) -> Self {
343 self.options.working_directory = Some(working_directory.into());
344 self
345 }
346
347 pub fn skip_git_repo_check(mut self, enabled: bool) -> Self {
348 self.options.skip_git_repo_check = Some(enabled);
349 self
350 }
351
352 pub fn model_reasoning_effort(mut self, model_reasoning_effort: ModelReasoningEffort) -> Self {
353 self.options.model_reasoning_effort = Some(model_reasoning_effort);
354 self
355 }
356
357 pub fn model_reasoning_summary(
358 mut self,
359 model_reasoning_summary: ModelReasoningSummary,
360 ) -> Self {
361 self.options.model_reasoning_summary = Some(model_reasoning_summary);
362 self
363 }
364
365 pub fn network_access_enabled(mut self, enabled: bool) -> Self {
366 self.options.network_access_enabled = Some(enabled);
367 self
368 }
369
370 pub fn web_search_mode(mut self, web_search_mode: WebSearchMode) -> Self {
371 self.options.web_search_mode = Some(web_search_mode);
372 self
373 }
374
375 pub fn web_search_enabled(mut self, enabled: bool) -> Self {
376 self.options.web_search_enabled = Some(enabled);
377 self
378 }
379
380 pub fn approval_policy(mut self, approval_policy: ApprovalMode) -> Self {
381 self.options.approval_policy = Some(approval_policy);
382 self
383 }
384
385 pub fn additional_directories(mut self, additional_directories: Vec<String>) -> Self {
386 self.options.additional_directories = Some(additional_directories);
387 self
388 }
389
390 pub fn add_directory(mut self, directory: impl Into<String>) -> Self {
391 self.options
392 .additional_directories
393 .get_or_insert_with(Vec::new)
394 .push(directory.into());
395 self
396 }
397
398 pub fn personality(mut self, personality: Personality) -> Self {
399 self.options.personality = Some(personality);
400 self
401 }
402
403 pub fn base_instructions(mut self, base_instructions: impl Into<String>) -> Self {
404 self.options.base_instructions = Some(base_instructions.into());
405 self
406 }
407
408 pub fn developer_instructions(mut self, developer_instructions: impl Into<String>) -> Self {
409 self.options.developer_instructions = Some(developer_instructions.into());
410 self
411 }
412
413 pub fn ephemeral(mut self, ephemeral: bool) -> Self {
414 self.options.ephemeral = Some(ephemeral);
415 self
416 }
417
418 pub fn collaboration_mode(mut self, collaboration_mode: CollaborationMode) -> Self {
419 self.options.collaboration_mode = Some(collaboration_mode);
420 self
421 }
422
423 pub fn config(mut self, config: Map<String, Value>) -> Self {
424 self.options.config = Some(config);
425 self
426 }
427
428 pub fn insert_config(mut self, key: impl Into<String>, value: Value) -> Self {
429 self.options
430 .config
431 .get_or_insert_with(Map::new)
432 .insert(key.into(), value);
433 self
434 }
435
436 pub fn dynamic_tools(mut self, dynamic_tools: Vec<DynamicToolSpec>) -> Self {
437 self.options.dynamic_tools = Some(dynamic_tools);
438 self
439 }
440
441 pub fn experimental_raw_events(mut self, enabled: bool) -> Self {
442 self.options.experimental_raw_events = Some(enabled);
443 self
444 }
445
446 pub fn persist_extended_history(mut self, enabled: bool) -> Self {
447 self.options.persist_extended_history = Some(enabled);
448 self
449 }
450}
451
452#[derive(Debug, Clone, Default)]
453pub struct TurnOptions {
454 pub output_schema: Option<Value>,
455 pub working_directory: Option<String>,
456 pub model: Option<String>,
457 pub model_provider: Option<String>,
458 pub model_reasoning_effort: Option<ModelReasoningEffort>,
459 pub model_reasoning_summary: Option<ModelReasoningSummary>,
460 pub personality: Option<Personality>,
461 pub approval_policy: Option<ApprovalMode>,
462 pub sandbox_policy: Option<Value>,
463 pub collaboration_mode: Option<CollaborationMode>,
464 pub skip_git_repo_check: Option<bool>,
465 pub web_search_mode: Option<WebSearchMode>,
466 pub web_search_enabled: Option<bool>,
467 pub network_access_enabled: Option<bool>,
468 pub additional_directories: Option<Vec<String>>,
469 pub extra: Option<Map<String, Value>>,
470}
471
472impl TurnOptions {
473 pub fn builder() -> TurnOptionsBuilder {
474 TurnOptionsBuilder::new()
475 }
476
477 pub fn with_output_schema(mut self, output_schema: Value) -> Self {
478 self.output_schema = Some(output_schema);
479 self
480 }
481
482 pub fn with_output_schema_for<T: OpenAiSerializable>(mut self) -> Self {
483 self.output_schema = Some(T::openai_output_schema());
484 self
485 }
486
487 pub fn with_model(mut self, model: impl Into<String>) -> Self {
488 self.model = Some(model.into());
489 self
490 }
491
492 pub fn with_working_directory(mut self, working_directory: impl Into<String>) -> Self {
493 self.working_directory = Some(working_directory.into());
494 self
495 }
496}
497
498#[derive(Debug, Clone, Default)]
499pub struct TurnOptionsBuilder {
500 options: TurnOptions,
501}
502
503impl TurnOptionsBuilder {
504 pub fn new() -> Self {
505 Self::default()
506 }
507
508 pub fn build(self) -> TurnOptions {
509 self.options
510 }
511
512 pub fn output_schema(mut self, output_schema: Value) -> Self {
513 self.options.output_schema = Some(output_schema);
514 self
515 }
516
517 pub fn output_schema_for<T: OpenAiSerializable>(mut self) -> Self {
518 self.options.output_schema = Some(T::openai_output_schema());
519 self
520 }
521
522 pub fn clear_output_schema(mut self) -> Self {
523 self.options.output_schema = None;
524 self
525 }
526
527 pub fn working_directory(mut self, working_directory: impl Into<String>) -> Self {
528 self.options.working_directory = Some(working_directory.into());
529 self
530 }
531
532 pub fn model(mut self, model: impl Into<String>) -> Self {
533 self.options.model = Some(model.into());
534 self
535 }
536
537 pub fn model_provider(mut self, model_provider: impl Into<String>) -> Self {
538 self.options.model_provider = Some(model_provider.into());
539 self
540 }
541
542 pub fn model_reasoning_effort(mut self, model_reasoning_effort: ModelReasoningEffort) -> Self {
543 self.options.model_reasoning_effort = Some(model_reasoning_effort);
544 self
545 }
546
547 pub fn model_reasoning_summary(
548 mut self,
549 model_reasoning_summary: ModelReasoningSummary,
550 ) -> Self {
551 self.options.model_reasoning_summary = Some(model_reasoning_summary);
552 self
553 }
554
555 pub fn personality(mut self, personality: Personality) -> Self {
556 self.options.personality = Some(personality);
557 self
558 }
559
560 pub fn approval_policy(mut self, approval_policy: ApprovalMode) -> Self {
561 self.options.approval_policy = Some(approval_policy);
562 self
563 }
564
565 pub fn sandbox_policy(mut self, sandbox_policy: Value) -> Self {
566 self.options.sandbox_policy = Some(sandbox_policy);
567 self
568 }
569
570 pub fn collaboration_mode(mut self, collaboration_mode: CollaborationMode) -> Self {
571 self.options.collaboration_mode = Some(collaboration_mode);
572 self
573 }
574
575 pub fn skip_git_repo_check(mut self, enabled: bool) -> Self {
576 self.options.skip_git_repo_check = Some(enabled);
577 self
578 }
579
580 pub fn web_search_mode(mut self, web_search_mode: WebSearchMode) -> Self {
581 self.options.web_search_mode = Some(web_search_mode);
582 self
583 }
584
585 pub fn web_search_enabled(mut self, enabled: bool) -> Self {
586 self.options.web_search_enabled = Some(enabled);
587 self
588 }
589
590 pub fn network_access_enabled(mut self, enabled: bool) -> Self {
591 self.options.network_access_enabled = Some(enabled);
592 self
593 }
594
595 pub fn additional_directories(mut self, additional_directories: Vec<String>) -> Self {
596 self.options.additional_directories = Some(additional_directories);
597 self
598 }
599
600 pub fn add_directory(mut self, directory: impl Into<String>) -> Self {
601 self.options
602 .additional_directories
603 .get_or_insert_with(Vec::new)
604 .push(directory.into());
605 self
606 }
607
608 pub fn extra(mut self, extra: Map<String, Value>) -> Self {
609 self.options.extra = Some(extra);
610 self
611 }
612
613 pub fn insert_extra(mut self, key: impl Into<String>, value: Value) -> Self {
614 self.options
615 .extra
616 .get_or_insert_with(Map::new)
617 .insert(key.into(), value);
618 self
619 }
620}
621
622#[derive(Debug, Clone, PartialEq, Eq)]
623pub struct ThreadError {
624 pub message: String,
625}
626
627#[derive(Debug, Error)]
628pub enum ThreadRunError {
629 #[error(transparent)]
630 Client(#[from] ClientError),
631 #[error("{message}")]
632 TurnFailed { message: String },
633}
634
635#[derive(Debug, Clone, PartialEq, Eq)]
636pub enum UserInput {
637 Text { text: String },
638 LocalImage { path: String },
639}
640
641#[derive(Debug, Clone, PartialEq, Eq)]
642pub enum Input {
643 Text(String),
644 Items(Vec<UserInput>),
645}
646
647impl Input {
648 pub fn text(text: impl Into<String>) -> Self {
649 Self::Text(text.into())
650 }
651
652 pub fn items(items: Vec<UserInput>) -> Self {
653 Self::Items(items)
654 }
655}
656
657impl From<String> for Input {
658 fn from(value: String) -> Self {
659 Self::Text(value)
660 }
661}
662
663impl From<&str> for Input {
664 fn from(value: &str) -> Self {
665 Self::Text(value.to_string())
666 }
667}
668
669impl From<Vec<UserInput>> for Input {
670 fn from(value: Vec<UserInput>) -> Self {
671 Self::Items(value)
672 }
673}
674
675#[derive(Debug, Clone, PartialEq, Eq)]
676pub struct Usage {
677 pub input_tokens: i64,
678 pub cached_input_tokens: i64,
679 pub output_tokens: i64,
680}
681
682#[derive(Debug, Clone, PartialEq)]
683pub struct Turn {
684 pub items: Vec<ThreadItem>,
685 pub final_response: String,
686 pub usage: Option<Usage>,
687}
688
689pub type RunResult = Turn;
690
691pub struct StreamedTurn {
692 receiver: mpsc::Receiver<Result<ThreadEvent, ClientError>>,
693 task: JoinHandle<()>,
694}
695
696impl StreamedTurn {
697 pub async fn next_event(&mut self) -> Option<Result<ThreadEvent, ClientError>> {
698 self.receiver.recv().await
699 }
700}
701
702impl Drop for StreamedTurn {
703 fn drop(&mut self) {
704 self.task.abort();
705 }
706}
707
708#[derive(Debug, Clone, PartialEq)]
709pub enum ThreadEvent {
710 ThreadStarted { thread_id: String },
711 TurnStarted,
712 TurnCompleted { usage: Option<Usage> },
713 TurnFailed { error: ThreadError },
714 ItemStarted { item: ThreadItem },
715 ItemUpdated { item: ThreadItem },
716 ItemCompleted { item: ThreadItem },
717 Error { message: String },
718}
719
720#[derive(Debug, Clone, Copy, PartialEq, Eq)]
721pub enum AgentMessagePhase {
722 Commentary,
723 FinalAnswer,
724 Unknown,
725}
726
727impl AgentMessagePhase {
728 pub fn as_str(self) -> &'static str {
729 match self {
730 Self::Commentary => "commentary",
731 Self::FinalAnswer => "final_answer",
732 Self::Unknown => "unknown",
733 }
734 }
735}
736
737#[derive(Debug, Clone, PartialEq, Eq)]
738pub struct AgentMessageItem {
739 pub id: String,
740 pub text: String,
741 pub phase: Option<AgentMessagePhase>,
742}
743
744impl AgentMessageItem {
745 pub fn is_final_answer(&self) -> bool {
746 matches!(self.phase, Some(AgentMessagePhase::FinalAnswer))
747 }
748}
749
750#[derive(Debug, Clone, PartialEq)]
751pub enum UserMessageContentItem {
752 Text { text: String },
753 Image { url: String },
754 LocalImage { path: String },
755 Unknown(Value),
756}
757
758#[derive(Debug, Clone, PartialEq)]
759pub struct UserMessageItem {
760 pub id: String,
761 pub content: Vec<UserMessageContentItem>,
762}
763
764#[derive(Debug, Clone, PartialEq, Eq)]
765pub struct PlanItem {
766 pub id: String,
767 pub text: String,
768}
769
770#[derive(Debug, Clone, PartialEq, Eq)]
771pub struct ReasoningItem {
772 pub id: String,
773 pub text: String,
774}
775
776#[derive(Debug, Clone, Copy, PartialEq, Eq)]
777pub enum CommandExecutionStatus {
778 InProgress,
779 Completed,
780 Failed,
781 Declined,
782 Unknown,
783}
784
785#[derive(Debug, Clone, PartialEq, Eq)]
786pub struct CommandExecutionItem {
787 pub id: String,
788 pub command: String,
789 pub aggregated_output: String,
790 pub exit_code: Option<i32>,
791 pub status: CommandExecutionStatus,
792}
793
794#[derive(Debug, Clone, Copy, PartialEq, Eq)]
795pub enum PatchChangeKind {
796 Add,
797 Delete,
798 Update,
799 Unknown,
800}
801
802#[derive(Debug, Clone, PartialEq, Eq)]
803pub struct FileUpdateChange {
804 pub path: String,
805 pub kind: PatchChangeKind,
806}
807
808#[derive(Debug, Clone, Copy, PartialEq, Eq)]
809pub enum PatchApplyStatus {
810 InProgress,
811 Completed,
812 Failed,
813 Declined,
814 Unknown,
815}
816
817#[derive(Debug, Clone, PartialEq, Eq)]
818pub struct FileChangeItem {
819 pub id: String,
820 pub changes: Vec<FileUpdateChange>,
821 pub status: PatchApplyStatus,
822}
823
824#[derive(Debug, Clone, Copy, PartialEq, Eq)]
825pub enum McpToolCallStatus {
826 InProgress,
827 Completed,
828 Failed,
829 Unknown,
830}
831
832#[derive(Debug, Clone, PartialEq)]
833pub struct McpToolCallItem {
834 pub id: String,
835 pub server: String,
836 pub tool: String,
837 pub arguments: Value,
838 pub result: Option<Value>,
839 pub error: Option<ThreadError>,
840 pub status: McpToolCallStatus,
841}
842
843#[derive(Debug, Clone, PartialEq)]
844pub struct DynamicToolCallItem {
845 pub id: String,
846 pub tool: String,
847 pub arguments: Value,
848 pub status: String,
849 pub content_items: Vec<Value>,
850 pub success: Option<bool>,
851 pub duration_ms: Option<u64>,
852}
853
854#[derive(Debug, Clone, PartialEq, Eq)]
855pub struct CollabToolCallItem {
856 pub id: String,
857 pub tool: String,
858 pub status: String,
859 pub sender_thread_id: String,
860 pub receiver_thread_id: Option<String>,
861 pub new_thread_id: Option<String>,
862 pub prompt: Option<String>,
863 pub agent_status: Option<String>,
864}
865
866#[derive(Debug, Clone, PartialEq, Eq)]
867pub struct WebSearchItem {
868 pub id: String,
869 pub query: String,
870}
871
872#[derive(Debug, Clone, PartialEq, Eq)]
873pub struct ImageViewItem {
874 pub id: String,
875 pub path: String,
876}
877
878#[derive(Debug, Clone, PartialEq, Eq)]
879pub struct ReviewModeItem {
880 pub id: String,
881 pub review: String,
882}
883
884#[derive(Debug, Clone, PartialEq, Eq)]
885pub struct ContextCompactionItem {
886 pub id: String,
887}
888
889#[derive(Debug, Clone, PartialEq, Eq)]
890pub struct TodoItem {
891 pub text: String,
892 pub completed: bool,
893}
894
895#[derive(Debug, Clone, PartialEq, Eq)]
896pub struct TodoListItem {
897 pub id: String,
898 pub items: Vec<TodoItem>,
899}
900
901#[derive(Debug, Clone, PartialEq, Eq)]
902pub struct ErrorItem {
903 pub id: String,
904 pub message: String,
905}
906
907#[derive(Debug, Clone, PartialEq)]
908pub struct UnknownItem {
909 pub id: Option<String>,
910 pub item_type: Option<String>,
911 pub raw: Value,
912}
913
914#[derive(Debug, Clone, PartialEq)]
915pub enum ThreadItem {
916 AgentMessage(AgentMessageItem),
917 UserMessage(UserMessageItem),
918 Plan(PlanItem),
919 Reasoning(ReasoningItem),
920 CommandExecution(CommandExecutionItem),
921 FileChange(FileChangeItem),
922 McpToolCall(McpToolCallItem),
923 DynamicToolCall(DynamicToolCallItem),
924 CollabToolCall(CollabToolCallItem),
925 WebSearch(WebSearchItem),
926 ImageView(ImageViewItem),
927 EnteredReviewMode(ReviewModeItem),
928 ExitedReviewMode(ReviewModeItem),
929 ContextCompaction(ContextCompactionItem),
930 TodoList(TodoListItem),
931 Error(ErrorItem),
932 Unknown(UnknownItem),
933}
934
935macro_rules! codex_forward_typed_method {
936 ($method:ident, $params_ty:ty, $result_ty:ty) => {
937 pub async fn $method(&self, params: $params_ty) -> Result<$result_ty, ClientError> {
938 self.ensure_initialized().await?;
939 self.inner.client.$method(params).await
940 }
941 };
942}
943
944macro_rules! codex_forward_null_method {
945 ($method:ident, $result_ty:ty) => {
946 pub async fn $method(&self) -> Result<$result_ty, ClientError> {
947 self.ensure_initialized().await?;
948 self.inner.client.$method().await
949 }
950 };
951}
952
953#[derive(Clone)]
954pub struct Codex {
955 inner: Arc<CodexInner>,
956}
957
958struct CodexInner {
959 client: CodexClient,
960 initialize_params: requests::InitializeParams,
961 initialized: AtomicBool,
962 initialize_lock: Mutex<()>,
963}
964
965impl Codex {
966 pub fn with_initialize_params(
967 client: CodexClient,
968 initialize_params: requests::InitializeParams,
969 ) -> Self {
970 Self {
971 inner: Arc::new(CodexInner {
972 client,
973 initialize_params,
974 initialized: AtomicBool::new(false),
975 initialize_lock: Mutex::new(()),
976 }),
977 }
978 }
979
980 pub fn from_client(client: CodexClient) -> Self {
981 let initialize_params = requests::InitializeParams::new(requests::ClientInfo::new(
982 "codex_sdk_rs",
983 "Codex Rust SDK",
984 env!("CARGO_PKG_VERSION"),
985 ));
986 Self::with_initialize_params(client, initialize_params)
987 }
988
989 pub async fn spawn_stdio(config: StdioConfig) -> Result<Self, ClientError> {
990 let client = CodexClient::spawn_stdio(config).await?;
991 Ok(Self::from_client(client))
992 }
993
994 pub async fn connect_ws(config: WsConfig) -> Result<Self, ClientError> {
995 let client = CodexClient::connect_ws(config).await?;
996 Ok(Self::from_client(client))
997 }
998
999 pub async fn start_ws(config: WsStartConfig) -> Result<WsServerHandle, ClientError> {
1000 CodexClient::start_ws(config).await
1001 }
1002
1003 pub async fn start_ws_daemon(config: WsStartConfig) -> Result<WsServerHandle, ClientError> {
1004 CodexClient::start_ws_daemon(config).await
1005 }
1006
1007 pub async fn start_ws_blocking(config: WsStartConfig) -> Result<WsServerHandle, ClientError> {
1008 CodexClient::start_ws_blocking(config).await
1009 }
1010
1011 pub async fn start_and_connect_ws(config: WsConfig) -> Result<Self, ClientError> {
1012 let client = CodexClient::start_and_connect_ws(config).await?;
1013 Ok(Self::from_client(client))
1014 }
1015
1016 pub fn start_thread(&self, options: ThreadOptions) -> Thread {
1017 Thread {
1018 codex: self.clone(),
1019 id: None,
1020 pending_resume: None,
1021 last_turn_id: None,
1022 options,
1023 }
1024 }
1025
1026 pub fn resume_thread(&self, target: impl Into<ResumeThread>, options: ThreadOptions) -> Thread {
1027 let target = target.into();
1028 let id = match &target {
1029 ResumeThread::Latest => None,
1030 ResumeThread::ById(thread_id) => Some(thread_id.clone()),
1031 };
1032
1033 Thread {
1034 codex: self.clone(),
1035 id,
1036 pending_resume: Some(target),
1037 last_turn_id: None,
1038 options,
1039 }
1040 }
1041
1042 pub fn resume_thread_by_id(&self, id: impl Into<String>, options: ThreadOptions) -> Thread {
1043 self.resume_thread(ResumeThread::ById(id.into()), options)
1044 }
1045
1046 pub fn resume_latest_thread(&self, options: ThreadOptions) -> Thread {
1047 self.resume_thread(ResumeThread::Latest, options)
1048 }
1049
1050 pub async fn ask(&self, input: impl Into<Input>) -> Result<String, ThreadRunError> {
1052 self.ask_with_options(input, ThreadOptions::default(), TurnOptions::default())
1053 .await
1054 }
1055
1056 pub async fn ask_with_options(
1058 &self,
1059 input: impl Into<Input>,
1060 thread_options: ThreadOptions,
1061 turn_options: TurnOptions,
1062 ) -> Result<String, ThreadRunError> {
1063 let mut thread = self.start_thread(thread_options);
1064 thread.ask(input, turn_options).await
1065 }
1066
1067 pub async fn thread_list(
1069 &self,
1070 params: requests::ThreadListParams,
1071 ) -> Result<responses::ThreadListResult, ClientError> {
1072 self.ensure_initialized().await?;
1073 self.inner.client.thread_list(params).await
1074 }
1075
1076 codex_forward_typed_method!(
1077 thread_start,
1078 requests::ThreadStartParams,
1079 responses::ThreadResult
1080 );
1081 codex_forward_typed_method!(
1082 thread_resume,
1083 requests::ThreadResumeParams,
1084 responses::ThreadResult
1085 );
1086 codex_forward_typed_method!(
1087 thread_fork,
1088 requests::ThreadForkParams,
1089 responses::ThreadResult
1090 );
1091 codex_forward_typed_method!(
1092 thread_archive,
1093 requests::ThreadArchiveParams,
1094 responses::ThreadArchiveResult
1095 );
1096 codex_forward_typed_method!(
1097 thread_name_set,
1098 requests::ThreadSetNameParams,
1099 responses::ThreadSetNameResult
1100 );
1101 codex_forward_typed_method!(
1102 thread_unarchive,
1103 requests::ThreadUnarchiveParams,
1104 responses::ThreadUnarchiveResult
1105 );
1106 codex_forward_typed_method!(
1107 thread_compact_start,
1108 requests::ThreadCompactStartParams,
1109 responses::ThreadCompactStartResult
1110 );
1111 codex_forward_typed_method!(
1112 thread_background_terminals_clean,
1113 requests::ThreadBackgroundTerminalsCleanParams,
1114 responses::ThreadBackgroundTerminalsCleanResult
1115 );
1116 codex_forward_typed_method!(
1117 thread_rollback,
1118 requests::ThreadRollbackParams,
1119 responses::ThreadRollbackResult
1120 );
1121 codex_forward_typed_method!(
1122 thread_loaded_list,
1123 requests::ThreadLoadedListParams,
1124 responses::ThreadLoadedListResult
1125 );
1126 codex_forward_typed_method!(
1127 thread_read,
1128 requests::ThreadReadParams,
1129 responses::ThreadReadResult
1130 );
1131 codex_forward_typed_method!(
1132 skills_list,
1133 requests::SkillsListParams,
1134 responses::SkillsListResult
1135 );
1136 codex_forward_typed_method!(
1137 skills_remote_list,
1138 requests::SkillsRemoteReadParams,
1139 responses::SkillsRemoteReadResult
1140 );
1141 codex_forward_typed_method!(
1142 skills_remote_export,
1143 requests::SkillsRemoteWriteParams,
1144 responses::SkillsRemoteWriteResult
1145 );
1146 codex_forward_typed_method!(
1147 app_list,
1148 requests::AppsListParams,
1149 responses::AppsListResult
1150 );
1151 codex_forward_typed_method!(
1152 skills_config_write,
1153 requests::SkillsConfigWriteParams,
1154 responses::SkillsConfigWriteResult
1155 );
1156 codex_forward_typed_method!(turn_start, requests::TurnStartParams, responses::TurnResult);
1157 codex_forward_typed_method!(
1158 turn_steer,
1159 requests::TurnSteerParams,
1160 responses::TurnSteerResult
1161 );
1162 codex_forward_typed_method!(turn_interrupt, requests::TurnInterruptParams, EmptyObject);
1163 codex_forward_typed_method!(
1164 review_start,
1165 requests::ReviewStartParams,
1166 responses::ReviewStartResult
1167 );
1168 codex_forward_typed_method!(
1169 model_list,
1170 requests::ModelListParams,
1171 responses::ModelListResult
1172 );
1173 codex_forward_typed_method!(
1174 experimental_feature_list,
1175 requests::ExperimentalFeatureListParams,
1176 responses::ExperimentalFeatureListResult
1177 );
1178 codex_forward_typed_method!(
1179 collaboration_mode_list,
1180 requests::CollaborationModeListParams,
1181 responses::CollaborationModeListResult
1182 );
1183 codex_forward_typed_method!(
1184 mock_experimental_method,
1185 requests::MockExperimentalMethodParams,
1186 responses::MockExperimentalMethodResult
1187 );
1188 codex_forward_typed_method!(
1189 mcp_server_oauth_login,
1190 requests::McpServerOauthLoginParams,
1191 responses::McpServerOauthLoginResult
1192 );
1193 codex_forward_typed_method!(
1194 mcp_server_status_list,
1195 requests::ListMcpServerStatusParams,
1196 responses::McpServerStatusListResult
1197 );
1198 codex_forward_typed_method!(
1199 windows_sandbox_setup_start,
1200 requests::WindowsSandboxSetupStartParams,
1201 responses::WindowsSandboxSetupStartResult
1202 );
1203 codex_forward_typed_method!(
1204 account_login_start,
1205 requests::LoginAccountParams,
1206 responses::LoginAccountResult
1207 );
1208 codex_forward_typed_method!(
1209 account_login_cancel,
1210 requests::CancelLoginAccountParams,
1211 EmptyObject
1212 );
1213 codex_forward_typed_method!(
1214 feedback_upload,
1215 requests::FeedbackUploadParams,
1216 responses::FeedbackUploadResult
1217 );
1218 codex_forward_typed_method!(
1219 command_exec,
1220 requests::CommandExecParams,
1221 responses::CommandExecResult
1222 );
1223 codex_forward_typed_method!(
1224 config_read,
1225 requests::ConfigReadParams,
1226 responses::ConfigReadResult
1227 );
1228 codex_forward_typed_method!(
1229 config_value_write,
1230 requests::ConfigValueWriteParams,
1231 responses::ConfigValueWriteResult
1232 );
1233 codex_forward_typed_method!(
1234 config_batch_write,
1235 requests::ConfigBatchWriteParams,
1236 responses::ConfigBatchWriteResult
1237 );
1238 codex_forward_typed_method!(
1239 account_read,
1240 requests::GetAccountParams,
1241 responses::GetAccountResult
1242 );
1243 codex_forward_typed_method!(
1244 fuzzy_file_search_session_start,
1245 requests::FuzzyFileSearchSessionStartParams,
1246 responses::FuzzyFileSearchSessionStartResult
1247 );
1248 codex_forward_typed_method!(
1249 fuzzy_file_search_session_update,
1250 requests::FuzzyFileSearchSessionUpdateParams,
1251 responses::FuzzyFileSearchSessionUpdateResult
1252 );
1253 codex_forward_typed_method!(
1254 fuzzy_file_search_session_stop,
1255 requests::FuzzyFileSearchSessionStopParams,
1256 responses::FuzzyFileSearchSessionStopResult
1257 );
1258
1259 pub async fn skills_remote_read(
1260 &self,
1261 params: requests::SkillsRemoteReadParams,
1262 ) -> Result<responses::SkillsRemoteReadResult, ClientError> {
1263 self.skills_remote_list(params).await
1264 }
1265
1266 pub async fn skills_remote_write(
1267 &self,
1268 params: requests::SkillsRemoteWriteParams,
1269 ) -> Result<responses::SkillsRemoteWriteResult, ClientError> {
1270 self.skills_remote_export(params).await
1271 }
1272
1273 codex_forward_null_method!(config_mcp_server_reload, EmptyObject);
1274 codex_forward_null_method!(account_logout, EmptyObject);
1275 codex_forward_null_method!(
1276 account_rate_limits_read,
1277 responses::AccountRateLimitsReadResult
1278 );
1279 codex_forward_null_method!(
1280 config_requirements_read,
1281 responses::ConfigRequirementsReadResult
1282 );
1283
1284 pub async fn send_raw_request(
1285 &self,
1286 method: impl Into<String>,
1287 params: Value,
1288 timeout: Option<std::time::Duration>,
1289 ) -> Result<Value, ClientError> {
1290 self.ensure_initialized().await?;
1291 self.inner
1292 .client
1293 .send_raw_request(method, params, timeout)
1294 .await
1295 }
1296
1297 pub async fn send_raw_notification(
1298 &self,
1299 method: impl Into<String>,
1300 params: Value,
1301 ) -> Result<(), ClientError> {
1302 self.ensure_initialized().await?;
1303 self.inner
1304 .client
1305 .send_raw_notification(method, params)
1306 .await
1307 }
1308
1309 pub fn client(&self) -> CodexClient {
1310 self.inner.client.clone()
1311 }
1312
1313 async fn ensure_initialized(&self) -> Result<(), ClientError> {
1314 if self.inner.initialized.load(Ordering::SeqCst) {
1315 return Ok(());
1316 }
1317
1318 let _guard = self.inner.initialize_lock.lock().await;
1319 if self.inner.initialized.load(Ordering::SeqCst) {
1320 return Ok(());
1321 }
1322
1323 match self
1324 .inner
1325 .client
1326 .initialize(self.inner.initialize_params.clone())
1327 .await
1328 {
1329 Ok(_) | Err(ClientError::AlreadyInitialized) => {}
1330 Err(err) => return Err(err),
1331 }
1332
1333 self.inner.client.initialized().await?;
1334 self.inner.initialized.store(true, Ordering::SeqCst);
1335 Ok(())
1336 }
1337}
1338
1339pub struct Thread {
1340 codex: Codex,
1341 id: Option<String>,
1342 pending_resume: Option<ResumeThread>,
1343 last_turn_id: Option<String>,
1344 options: ThreadOptions,
1345}
1346
1347impl Thread {
1348 pub fn id(&self) -> Option<&str> {
1349 self.id.as_deref()
1350 }
1351
1352 async fn ensure_thread_started_or_resumed(
1353 &mut self,
1354 ) -> Result<(String, Option<String>), ClientError> {
1355 self.codex.ensure_initialized().await?;
1356
1357 let mut emit_thread_started = None;
1358 if let Some(pending_resume) = self.pending_resume.clone() {
1359 let thread_id = match pending_resume {
1360 ResumeThread::ById(thread_id) => thread_id,
1361 ResumeThread::Latest => self.resolve_latest_thread_id().await?,
1362 };
1363 let resume_params = build_thread_resume_params(&thread_id, &self.options);
1364 let resumed = self.codex.inner.client.thread_resume(resume_params).await?;
1365 self.id = Some(resumed.thread.id);
1366 self.pending_resume = None;
1367 self.last_turn_id = None;
1368 } else if self.id.is_none() {
1369 let thread = self
1370 .codex
1371 .inner
1372 .client
1373 .thread_start(build_thread_start_params(&self.options))
1374 .await?;
1375 self.id = Some(thread.thread.id.clone());
1376 emit_thread_started = Some(thread.thread.id);
1377 self.last_turn_id = None;
1378 }
1379
1380 let thread_id = self.id.clone().ok_or_else(|| {
1381 ClientError::TransportSend("thread id unavailable after start/resume".to_string())
1382 })?;
1383
1384 Ok((thread_id, emit_thread_started))
1385 }
1386
1387 async fn resolve_latest_thread_id(&self) -> Result<String, ClientError> {
1388 let mut cursor: Option<String> = None;
1389 let mut pages_scanned = 0usize;
1390 let mut threads = Vec::new();
1391
1392 loop {
1393 pages_scanned += 1;
1394 if pages_scanned > MAX_THREAD_LIST_PAGES {
1395 return Err(ClientError::TransportSend(format!(
1396 "could not resolve latest thread after scanning {MAX_THREAD_LIST_PAGES} pages"
1397 )));
1398 }
1399
1400 let result = self
1401 .codex
1402 .inner
1403 .client
1404 .thread_list(requests::ThreadListParams {
1405 limit: Some(THREAD_LIST_PAGE_LIMIT),
1406 cursor: cursor.clone(),
1407 ..Default::default()
1408 })
1409 .await?;
1410
1411 threads.extend(result.data);
1412
1413 match result.next_cursor {
1414 Some(next) => cursor = Some(next),
1415 None => break,
1416 }
1417 }
1418
1419 select_latest_thread_id(&threads, self.options.working_directory.as_deref()).ok_or_else(
1420 || match self.options.working_directory.as_deref() {
1421 Some(working_directory) => ClientError::TransportSend(format!(
1422 "no recorded thread found for working directory `{working_directory}`"
1423 )),
1424 None => ClientError::TransportSend("no recorded thread found".to_string()),
1425 },
1426 )
1427 }
1428
1429 pub async fn set_name(
1430 &mut self,
1431 name: impl Into<String>,
1432 ) -> Result<responses::ThreadSetNameResult, ClientError> {
1433 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1434 self.codex
1435 .inner
1436 .client
1437 .thread_name_set(requests::ThreadSetNameParams {
1438 thread_id,
1439 name: name.into(),
1440 extra: Map::new(),
1441 })
1442 .await
1443 }
1444
1445 pub async fn read(
1446 &mut self,
1447 include_turns: Option<bool>,
1448 ) -> Result<responses::ThreadReadResult, ClientError> {
1449 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1450 self.codex
1451 .inner
1452 .client
1453 .thread_read(requests::ThreadReadParams {
1454 thread_id,
1455 include_turns,
1456 extra: Map::new(),
1457 })
1458 .await
1459 }
1460
1461 pub async fn archive(&mut self) -> Result<responses::ThreadArchiveResult, ClientError> {
1462 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1463 self.codex
1464 .inner
1465 .client
1466 .thread_archive(requests::ThreadArchiveParams {
1467 thread_id,
1468 extra: Map::new(),
1469 })
1470 .await
1471 }
1472
1473 pub async fn unarchive(&mut self) -> Result<responses::ThreadUnarchiveResult, ClientError> {
1474 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1475 self.codex
1476 .inner
1477 .client
1478 .thread_unarchive(requests::ThreadUnarchiveParams {
1479 thread_id,
1480 extra: Map::new(),
1481 })
1482 .await
1483 }
1484
1485 pub async fn rollback(
1486 &mut self,
1487 count: u32,
1488 ) -> Result<responses::ThreadRollbackResult, ClientError> {
1489 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1490 self.codex
1491 .inner
1492 .client
1493 .thread_rollback(requests::ThreadRollbackParams {
1494 thread_id,
1495 count,
1496 extra: Map::new(),
1497 })
1498 .await
1499 }
1500
1501 pub async fn compact_start(
1502 &mut self,
1503 ) -> Result<responses::ThreadCompactStartResult, ClientError> {
1504 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1505 self.codex
1506 .inner
1507 .client
1508 .thread_compact_start(requests::ThreadCompactStartParams {
1509 thread_id,
1510 extra: Map::new(),
1511 })
1512 .await
1513 }
1514
1515 pub async fn steer(
1516 &mut self,
1517 input: impl Into<Input>,
1518 expected_turn_id: Option<String>,
1519 ) -> Result<responses::TurnSteerResult, ClientError> {
1520 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1521 let expected_turn_id = expected_turn_id.or_else(|| self.last_turn_id.clone());
1522 let expected_turn_id = expected_turn_id.ok_or_else(|| {
1523 ClientError::TransportSend(
1524 "turn/steer requires expected_turn_id or a previously started turn".to_string(),
1525 )
1526 })?;
1527 self.codex
1528 .inner
1529 .client
1530 .turn_steer(requests::TurnSteerParams {
1531 thread_id,
1532 input: normalize_input(input.into()),
1533 expected_turn_id: Some(expected_turn_id),
1534 extra: Map::new(),
1535 })
1536 .await
1537 }
1538
1539 pub async fn interrupt(&mut self, turn_id: impl Into<String>) -> Result<(), ClientError> {
1540 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1541 self.codex
1542 .inner
1543 .client
1544 .turn_interrupt(requests::TurnInterruptParams {
1545 thread_id,
1546 turn_id: turn_id.into(),
1547 extra: Map::new(),
1548 })
1549 .await?;
1550 Ok(())
1551 }
1552
1553 pub async fn run_streamed(
1554 &mut self,
1555 input: impl Into<Input>,
1556 turn_options: TurnOptions,
1557 ) -> Result<StreamedTurn, ClientError> {
1558 let (thread_id, emit_thread_started) = self.ensure_thread_started_or_resumed().await?;
1559
1560 let server_events = self.codex.inner.client.subscribe();
1561
1562 let turn_response = self
1563 .codex
1564 .inner
1565 .client
1566 .turn_start(build_turn_start_params(
1567 &thread_id,
1568 input.into(),
1569 &self.options,
1570 &turn_options,
1571 ))
1572 .await?;
1573 let turn_id = turn_response.turn.id;
1574 self.last_turn_id = Some(turn_id.clone());
1575
1576 let (tx, rx) = mpsc::channel(256);
1577
1578 if let Some(started_thread_id) = emit_thread_started
1579 && tx
1580 .send(Ok(ThreadEvent::ThreadStarted {
1581 thread_id: started_thread_id,
1582 }))
1583 .await
1584 .is_err()
1585 {
1586 return Err(ClientError::TransportClosed);
1587 }
1588
1589 if tx.send(Ok(ThreadEvent::TurnStarted)).await.is_err() {
1590 return Err(ClientError::TransportClosed);
1591 }
1592
1593 let task = tokio::spawn(async move {
1594 pump_turn_events(server_events, tx, thread_id, turn_id).await;
1595 });
1596
1597 Ok(StreamedTurn { receiver: rx, task })
1598 }
1599
1600 pub async fn run(
1601 &mut self,
1602 input: impl Into<Input>,
1603 turn_options: TurnOptions,
1604 ) -> Result<Turn, ThreadRunError> {
1605 let mut streamed = self.run_streamed(input, turn_options).await?;
1606 let mut items = Vec::new();
1607 let mut final_answer = None;
1608 let mut fallback_response = None;
1609 let mut usage = None;
1610 let mut saw_terminal = false;
1611
1612 while let Some(next) = streamed.next_event().await {
1613 let event = next.map_err(ThreadRunError::Client)?;
1614 match event {
1615 ThreadEvent::ItemCompleted { item } => {
1616 update_final_response_candidates(
1617 &item,
1618 &mut final_answer,
1619 &mut fallback_response,
1620 );
1621 items.push(item);
1622 }
1623 ThreadEvent::TurnCompleted { usage: completed } => {
1624 usage = completed;
1625 saw_terminal = true;
1626 break;
1627 }
1628 ThreadEvent::TurnFailed { error } => {
1629 return Err(ThreadRunError::TurnFailed {
1630 message: error.message,
1631 });
1632 }
1633 ThreadEvent::Error { message } => {
1634 return Err(ThreadRunError::TurnFailed { message });
1635 }
1636 ThreadEvent::ThreadStarted { .. }
1637 | ThreadEvent::TurnStarted
1638 | ThreadEvent::ItemStarted { .. }
1639 | ThreadEvent::ItemUpdated { .. } => {}
1640 }
1641 }
1642
1643 if !saw_terminal {
1644 return Err(ThreadRunError::Client(ClientError::TransportClosed));
1645 }
1646
1647 Ok(Turn {
1648 items,
1649 final_response: final_answer.or(fallback_response).unwrap_or_default(),
1650 usage,
1651 })
1652 }
1653
1654 pub async fn ask(
1656 &mut self,
1657 input: impl Into<Input>,
1658 turn_options: TurnOptions,
1659 ) -> Result<String, ThreadRunError> {
1660 let turn = self.run(input, turn_options).await?;
1661 Ok(turn.final_response)
1662 }
1663}
1664
1665async fn pump_turn_events(
1666 mut server_events: tokio::sync::broadcast::Receiver<ServerEvent>,
1667 tx: mpsc::Sender<Result<ThreadEvent, ClientError>>,
1668 thread_id: String,
1669 turn_id: String,
1670) {
1671 let mut latest_usage: Option<Usage> = None;
1672
1673 loop {
1674 let next = server_events.recv().await;
1675 let server_event = match next {
1676 Ok(event) => event,
1677 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
1678 let _ = tx.send(Err(ClientError::TransportClosed)).await;
1679 break;
1680 }
1681 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
1682 continue;
1683 }
1684 };
1685
1686 match server_event {
1687 ServerEvent::Notification(notification) => match notification {
1688 ServerNotification::ItemStarted(payload)
1689 if matches_target_from_extra(&payload.extra, &thread_id, Some(&turn_id)) =>
1690 {
1691 if tx
1692 .send(Ok(ThreadEvent::ItemStarted {
1693 item: parse_thread_item(payload.item),
1694 }))
1695 .await
1696 .is_err()
1697 {
1698 break;
1699 }
1700 }
1701 ServerNotification::ItemCompleted(payload)
1702 if matches_target_from_extra(&payload.extra, &thread_id, Some(&turn_id)) =>
1703 {
1704 if tx
1705 .send(Ok(ThreadEvent::ItemCompleted {
1706 item: parse_thread_item(payload.item),
1707 }))
1708 .await
1709 .is_err()
1710 {
1711 break;
1712 }
1713 }
1714 ServerNotification::ItemAgentMessageDelta(delta)
1715 if matches_target_from_extra(&delta.extra, &thread_id, Some(&turn_id)) =>
1716 {
1717 let text = delta.delta.or(delta.text).unwrap_or_default();
1718 if text.is_empty() {
1719 continue;
1720 }
1721 let item = ThreadItem::AgentMessage(AgentMessageItem {
1722 id: delta.item_id.unwrap_or_default(),
1723 text,
1724 phase: None,
1725 });
1726 if tx
1727 .send(Ok(ThreadEvent::ItemUpdated { item }))
1728 .await
1729 .is_err()
1730 {
1731 break;
1732 }
1733 }
1734 ServerNotification::ItemPlanDelta(delta)
1735 if matches_target_from_extra(&delta.extra, &thread_id, Some(&turn_id)) =>
1736 {
1737 let text = delta.delta.or(delta.text).unwrap_or_default();
1738 if text.is_empty() {
1739 continue;
1740 }
1741 let item = ThreadItem::Plan(PlanItem {
1742 id: delta.item_id.unwrap_or_default(),
1743 text,
1744 });
1745 if tx
1746 .send(Ok(ThreadEvent::ItemUpdated { item }))
1747 .await
1748 .is_err()
1749 {
1750 break;
1751 }
1752 }
1753 ServerNotification::TurnCompleted(payload)
1754 if payload.turn.id == turn_id
1755 && matches_target_from_extra(
1756 &payload.turn.extra,
1757 &thread_id,
1758 Some(&turn_id),
1759 ) =>
1760 {
1761 let status = payload.turn.status.unwrap_or_default().to_ascii_lowercase();
1762 if status == "failed" {
1763 let message = payload
1764 .turn
1765 .error
1766 .map(|error| error.message)
1767 .unwrap_or_else(|| "turn failed".to_string());
1768 let _ = tx
1769 .send(Ok(ThreadEvent::TurnFailed {
1770 error: ThreadError { message },
1771 }))
1772 .await;
1773 break;
1774 }
1775
1776 let usage = parse_usage_from_turn_extra(&payload.turn.extra)
1777 .or_else(|| latest_usage.clone());
1778 let _ = tx.send(Ok(ThreadEvent::TurnCompleted { usage })).await;
1779 break;
1780 }
1781 ServerNotification::ThreadTokenUsageUpdated(payload)
1782 if payload
1783 .thread_id
1784 .as_deref()
1785 .is_none_or(|incoming| incoming == thread_id) =>
1786 {
1787 if !matches_target_from_extra(&payload.extra, &thread_id, Some(&turn_id)) {
1788 continue;
1789 }
1790 latest_usage = payload
1791 .usage
1792 .as_ref()
1793 .and_then(parse_usage_from_value)
1794 .or_else(|| {
1795 payload
1796 .extra
1797 .get("tokenUsage")
1798 .and_then(parse_usage_from_value)
1799 });
1800 }
1801 ServerNotification::Error(payload)
1802 if matches_target_from_extra(&payload.extra, &thread_id, Some(&turn_id)) =>
1803 {
1804 let _ = tx
1805 .send(Ok(ThreadEvent::Error {
1806 message: payload.error.message,
1807 }))
1808 .await;
1809 }
1810 _ => {}
1811 },
1812 ServerEvent::TransportClosed => {
1813 let _ = tx.send(Err(ClientError::TransportClosed)).await;
1814 break;
1815 }
1816 ServerEvent::ServerRequest(_) => {}
1817 }
1818 }
1819}
1820
1821fn update_final_response_candidates(
1822 item: &ThreadItem,
1823 final_answer: &mut Option<String>,
1824 fallback_response: &mut Option<String>,
1825) {
1826 let ThreadItem::AgentMessage(agent_message) = item else {
1827 return;
1828 };
1829
1830 if agent_message.is_final_answer() {
1831 *final_answer = Some(agent_message.text.clone());
1832 } else {
1833 *fallback_response = Some(agent_message.text.clone());
1834 }
1835}
1836
1837fn select_latest_thread_id(
1838 threads: &[responses::ThreadSummary],
1839 working_directory: Option<&str>,
1840) -> Option<String> {
1841 let mut selected: Option<(Option<i64>, String)> = None;
1842
1843 for thread in threads {
1844 if !thread_matches_working_directory(thread, working_directory) {
1845 continue;
1846 }
1847
1848 let candidate = (thread_recency_score(thread), thread.id.clone());
1849 match &selected {
1850 None => selected = Some(candidate),
1851 Some((Some(best_score), _)) if candidate.0.is_some_and(|score| score > *best_score) => {
1855 selected = Some(candidate)
1856 }
1857 Some(_) => {}
1858 }
1859 }
1860
1861 selected.map(|(_, thread_id)| thread_id)
1862}
1863
1864fn thread_matches_working_directory(
1865 thread: &responses::ThreadSummary,
1866 working_directory: Option<&str>,
1867) -> bool {
1868 let Some(working_directory) = working_directory else {
1869 return true;
1870 };
1871
1872 thread.extra.get("cwd").and_then(Value::as_str) == Some(working_directory)
1873}
1874
1875fn thread_recency_score(thread: &responses::ThreadSummary) -> Option<i64> {
1876 parse_timestamp(thread.extra.get("updatedAt"))
1877 .or_else(|| parse_timestamp(thread.extra.get("createdAt")))
1878}
1879
1880fn parse_timestamp(value: Option<&Value>) -> Option<i64> {
1881 let value = value?;
1882 match value {
1883 Value::Number(number) => number
1884 .as_i64()
1885 .or_else(|| number.as_u64().and_then(|raw| i64::try_from(raw).ok())),
1886 Value::String(raw) => raw.parse::<i64>().ok(),
1887 _ => None,
1888 }
1889}
1890
1891fn build_thread_start_params(options: &ThreadOptions) -> requests::ThreadStartParams {
1892 let mut extra = Map::new();
1893 if let Some(skip) = options.skip_git_repo_check {
1894 extra.insert("skipGitRepoCheck".to_string(), Value::Bool(skip));
1895 }
1896 if let Some(mode) = options.web_search_mode {
1897 extra.insert(
1898 "webSearchMode".to_string(),
1899 Value::String(mode.as_str().to_string()),
1900 );
1901 }
1902 if let Some(enabled) = options.web_search_enabled {
1903 extra.insert("webSearchEnabled".to_string(), Value::Bool(enabled));
1904 }
1905 if let Some(network) = options.network_access_enabled {
1906 extra.insert("networkAccessEnabled".to_string(), Value::Bool(network));
1907 }
1908 if let Some(additional) = &options.additional_directories {
1909 extra.insert(
1910 "additionalDirectories".to_string(),
1911 Value::Array(
1912 additional
1913 .iter()
1914 .map(|entry| Value::String(entry.clone()))
1915 .collect(),
1916 ),
1917 );
1918 }
1919 if let Some(config) = &options.config {
1920 extra.insert("config".to_string(), Value::Object(config.clone()));
1921 }
1922 if let Some(dynamic_tools) = &options.dynamic_tools {
1923 extra.insert(
1924 "dynamicTools".to_string(),
1925 Value::Array(
1926 dynamic_tools
1927 .iter()
1928 .map(DynamicToolSpec::as_value)
1929 .collect(),
1930 ),
1931 );
1932 }
1933 if let Some(enabled) = options.experimental_raw_events {
1934 extra.insert("experimentalRawEvents".to_string(), Value::Bool(enabled));
1935 }
1936 if let Some(enabled) = options.persist_extended_history {
1937 extra.insert("persistExtendedHistory".to_string(), Value::Bool(enabled));
1938 }
1939
1940 requests::ThreadStartParams {
1941 model: options.model.clone(),
1942 model_provider: options.model_provider.clone(),
1943 cwd: options.working_directory.clone(),
1944 approval_policy: options
1945 .approval_policy
1946 .map(|mode| mode.as_str().to_string()),
1947 sandbox: options.sandbox_mode.map(|mode| mode.as_str().to_string()),
1948 sandbox_policy: options.sandbox_policy.clone(),
1949 effort: options
1950 .model_reasoning_effort
1951 .map(|effort| effort.as_str().to_string()),
1952 summary: options
1953 .model_reasoning_summary
1954 .map(|summary| summary.as_str().to_string()),
1955 personality: options.personality.map(|value| value.as_str().to_string()),
1956 ephemeral: options.ephemeral,
1957 base_instructions: options.base_instructions.clone(),
1958 developer_instructions: options.developer_instructions.clone(),
1959 extra,
1960 }
1961}
1962
1963fn build_thread_resume_params(
1964 thread_id: &str,
1965 options: &ThreadOptions,
1966) -> requests::ThreadResumeParams {
1967 let mut extra = Map::new();
1968 if let Some(skip) = options.skip_git_repo_check {
1969 extra.insert("skipGitRepoCheck".to_string(), Value::Bool(skip));
1970 }
1971 if let Some(mode) = options.web_search_mode {
1972 extra.insert(
1973 "webSearchMode".to_string(),
1974 Value::String(mode.as_str().to_string()),
1975 );
1976 }
1977 if let Some(enabled) = options.web_search_enabled {
1978 extra.insert("webSearchEnabled".to_string(), Value::Bool(enabled));
1979 }
1980 if let Some(network) = options.network_access_enabled {
1981 extra.insert("networkAccessEnabled".to_string(), Value::Bool(network));
1982 }
1983 if let Some(additional) = &options.additional_directories {
1984 extra.insert(
1985 "additionalDirectories".to_string(),
1986 Value::Array(
1987 additional
1988 .iter()
1989 .map(|entry| Value::String(entry.clone()))
1990 .collect(),
1991 ),
1992 );
1993 }
1994 if let Some(policy) = &options.sandbox_policy {
1995 extra.insert("sandboxPolicy".to_string(), policy.clone());
1996 }
1997 if let Some(effort) = options.model_reasoning_effort {
1998 extra.insert(
1999 "effort".to_string(),
2000 Value::String(effort.as_str().to_string()),
2001 );
2002 }
2003 if let Some(summary) = options.model_reasoning_summary {
2004 extra.insert(
2005 "summary".to_string(),
2006 Value::String(summary.as_str().to_string()),
2007 );
2008 }
2009 if let Some(ephemeral) = options.ephemeral {
2010 extra.insert("ephemeral".to_string(), Value::Bool(ephemeral));
2011 }
2012 if let Some(collaboration_mode) = &options.collaboration_mode {
2013 extra.insert(
2014 "collaborationMode".to_string(),
2015 collaboration_mode.as_value(),
2016 );
2017 }
2018 if let Some(dynamic_tools) = &options.dynamic_tools {
2019 extra.insert(
2020 "dynamicTools".to_string(),
2021 Value::Array(
2022 dynamic_tools
2023 .iter()
2024 .map(DynamicToolSpec::as_value)
2025 .collect(),
2026 ),
2027 );
2028 }
2029 if let Some(enabled) = options.experimental_raw_events {
2030 extra.insert("experimentalRawEvents".to_string(), Value::Bool(enabled));
2031 }
2032
2033 requests::ThreadResumeParams {
2034 thread_id: thread_id.to_string(),
2035 history: None,
2036 path: None,
2037 model: options.model.clone(),
2038 model_provider: options.model_provider.clone(),
2039 cwd: options.working_directory.clone(),
2040 approval_policy: options
2041 .approval_policy
2042 .map(|mode| mode.as_str().to_string()),
2043 sandbox: options.sandbox_mode.map(|mode| mode.as_str().to_string()),
2044 config: options.config.clone(),
2045 base_instructions: options.base_instructions.clone(),
2046 developer_instructions: options.developer_instructions.clone(),
2047 personality: options.personality.map(|value| value.as_str().to_string()),
2048 persist_extended_history: options.persist_extended_history,
2049 extra,
2050 }
2051}
2052
2053fn build_turn_start_params(
2054 thread_id: &str,
2055 input: Input,
2056 options: &ThreadOptions,
2057 turn_options: &TurnOptions,
2058) -> requests::TurnStartParams {
2059 let mut extra = Map::new();
2060 if let Some(skip) = turn_options
2061 .skip_git_repo_check
2062 .or(options.skip_git_repo_check)
2063 {
2064 extra.insert("skipGitRepoCheck".to_string(), Value::Bool(skip));
2065 }
2066 if let Some(mode) = turn_options.web_search_mode.or(options.web_search_mode) {
2067 extra.insert(
2068 "webSearchMode".to_string(),
2069 Value::String(mode.as_str().to_string()),
2070 );
2071 }
2072 if let Some(enabled) = turn_options
2073 .web_search_enabled
2074 .or(options.web_search_enabled)
2075 {
2076 extra.insert("webSearchEnabled".to_string(), Value::Bool(enabled));
2077 }
2078 if let Some(network) = turn_options
2079 .network_access_enabled
2080 .or(options.network_access_enabled)
2081 {
2082 extra.insert("networkAccessEnabled".to_string(), Value::Bool(network));
2083 }
2084 if let Some(additional) = turn_options
2085 .additional_directories
2086 .as_ref()
2087 .or(options.additional_directories.as_ref())
2088 {
2089 extra.insert(
2090 "additionalDirectories".to_string(),
2091 Value::Array(
2092 additional
2093 .iter()
2094 .map(|entry| Value::String(entry.clone()))
2095 .collect(),
2096 ),
2097 );
2098 }
2099 if let Some(collaboration_mode) = turn_options
2100 .collaboration_mode
2101 .as_ref()
2102 .or(options.collaboration_mode.as_ref())
2103 {
2104 extra.insert(
2105 "collaborationMode".to_string(),
2106 collaboration_mode.as_value(),
2107 );
2108 }
2109 if let Some(extra_overrides) = &turn_options.extra {
2110 for (key, value) in extra_overrides {
2111 extra.insert(key.clone(), value.clone());
2112 }
2113 }
2114
2115 requests::TurnStartParams {
2116 thread_id: thread_id.to_string(),
2117 input: normalize_input(input),
2118 cwd: turn_options
2119 .working_directory
2120 .clone()
2121 .or_else(|| options.working_directory.clone()),
2122 model: turn_options.model.clone().or_else(|| options.model.clone()),
2123 model_provider: turn_options
2124 .model_provider
2125 .clone()
2126 .or_else(|| options.model_provider.clone()),
2127 effort: turn_options
2128 .model_reasoning_effort
2129 .or(options.model_reasoning_effort)
2130 .map(|effort| effort.as_str().to_string()),
2131 summary: turn_options
2132 .model_reasoning_summary
2133 .or(options.model_reasoning_summary)
2134 .map(|summary| summary.as_str().to_string()),
2135 personality: turn_options
2136 .personality
2137 .or(options.personality)
2138 .map(|value| value.as_str().to_string()),
2139 output_schema: turn_options.output_schema.clone(),
2140 approval_policy: turn_options
2141 .approval_policy
2142 .or(options.approval_policy)
2143 .map(|mode| mode.as_str().to_string()),
2144 sandbox_policy: turn_options
2145 .sandbox_policy
2146 .clone()
2147 .or_else(|| options.sandbox_policy.clone()),
2148 collaboration_mode: None,
2149 extra,
2150 }
2151}
2152
2153fn normalize_input(input: Input) -> Vec<requests::TurnInputItem> {
2154 match input {
2155 Input::Text(text) => vec![requests::TurnInputItem::Text { text }],
2156 Input::Items(items) => {
2157 let mut text_parts = Vec::new();
2158 let mut normalized = Vec::new();
2159
2160 for item in items {
2161 match item {
2162 UserInput::Text { text } => text_parts.push(text),
2163 UserInput::LocalImage { path } => {
2164 normalized.push(requests::TurnInputItem::LocalImage { path });
2165 }
2166 }
2167 }
2168
2169 if !text_parts.is_empty() {
2170 normalized.insert(
2171 0,
2172 requests::TurnInputItem::Text {
2173 text: text_parts.join("\n\n"),
2174 },
2175 );
2176 }
2177
2178 normalized
2179 }
2180 }
2181}
2182
2183fn matches_target_from_extra(
2184 extra: &Map<String, Value>,
2185 thread_id: &str,
2186 turn_id: Option<&str>,
2187) -> bool {
2188 let thread_matches = extra
2189 .get("threadId")
2190 .and_then(Value::as_str)
2191 .map(|incoming| incoming == thread_id)
2192 .unwrap_or(true);
2193
2194 let turn_matches = match turn_id {
2195 Some(target_turn_id) => extra
2196 .get("turnId")
2197 .and_then(Value::as_str)
2198 .map(|incoming| incoming == target_turn_id)
2199 .unwrap_or(true),
2200 None => true,
2201 };
2202
2203 thread_matches && turn_matches
2204}
2205
2206fn parse_usage_from_turn_extra(extra: &Map<String, Value>) -> Option<Usage> {
2207 extra
2208 .get("usage")
2209 .and_then(parse_usage_from_value)
2210 .or_else(|| extra.get("tokenUsage").and_then(parse_usage_from_value))
2211}
2212
2213fn parse_usage_from_value(value: &Value) -> Option<Usage> {
2214 let object = value.as_object()?;
2215 if let Some(last) = object.get("last") {
2216 return parse_usage_from_value(last);
2217 }
2218
2219 let input_tokens = object.get("inputTokens").and_then(Value::as_i64)?;
2220 let cached_input_tokens = object
2221 .get("cachedInputTokens")
2222 .and_then(Value::as_i64)
2223 .unwrap_or(0);
2224 let output_tokens = object.get("outputTokens").and_then(Value::as_i64)?;
2225
2226 Some(Usage {
2227 input_tokens,
2228 cached_input_tokens,
2229 output_tokens,
2230 })
2231}
2232
2233fn parse_thread_item(item: Value) -> ThreadItem {
2234 let object = match item.as_object() {
2235 Some(object) => object,
2236 None => {
2237 return ThreadItem::Unknown(UnknownItem {
2238 id: None,
2239 item_type: None,
2240 raw: item,
2241 });
2242 }
2243 };
2244
2245 let item_type = object
2246 .get("type")
2247 .and_then(Value::as_str)
2248 .map(|value| value.to_string());
2249 let id = object
2250 .get("id")
2251 .and_then(Value::as_str)
2252 .map(|value| value.to_string());
2253
2254 match item_type.as_deref() {
2255 Some("agentMessage") => ThreadItem::AgentMessage(AgentMessageItem {
2256 id: id.unwrap_or_default(),
2257 text: object
2258 .get("text")
2259 .and_then(Value::as_str)
2260 .unwrap_or_default()
2261 .to_string(),
2262 phase: object
2263 .get("phase")
2264 .and_then(Value::as_str)
2265 .map(parse_agent_message_phase),
2266 }),
2267 Some("userMessage") => ThreadItem::UserMessage(UserMessageItem {
2268 id: id.unwrap_or_default(),
2269 content: object
2270 .get("content")
2271 .and_then(Value::as_array)
2272 .map(|entries| {
2273 entries
2274 .iter()
2275 .map(parse_user_message_content_item)
2276 .collect()
2277 })
2278 .unwrap_or_default(),
2279 }),
2280 Some("plan") => ThreadItem::Plan(PlanItem {
2281 id: id.unwrap_or_default(),
2282 text: object
2283 .get("text")
2284 .and_then(Value::as_str)
2285 .unwrap_or_default()
2286 .to_string(),
2287 }),
2288 Some("reasoning") => ThreadItem::Reasoning(ReasoningItem {
2289 id: id.unwrap_or_default(),
2290 text: parse_reasoning_text(object),
2291 }),
2292 Some("commandExecution") => ThreadItem::CommandExecution(CommandExecutionItem {
2293 id: id.unwrap_or_default(),
2294 command: object
2295 .get("command")
2296 .and_then(Value::as_str)
2297 .unwrap_or_default()
2298 .to_string(),
2299 aggregated_output: object
2300 .get("aggregatedOutput")
2301 .and_then(Value::as_str)
2302 .unwrap_or_default()
2303 .to_string(),
2304 exit_code: object
2305 .get("exitCode")
2306 .and_then(Value::as_i64)
2307 .map(|value| value as i32),
2308 status: parse_command_execution_status(
2309 object
2310 .get("status")
2311 .and_then(Value::as_str)
2312 .unwrap_or_default(),
2313 ),
2314 }),
2315 Some("fileChange") => ThreadItem::FileChange(FileChangeItem {
2316 id: id.unwrap_or_default(),
2317 changes: object
2318 .get("changes")
2319 .and_then(Value::as_array)
2320 .map(|changes| {
2321 changes
2322 .iter()
2323 .filter_map(parse_file_update_change)
2324 .collect::<Vec<_>>()
2325 })
2326 .unwrap_or_default(),
2327 status: parse_patch_apply_status(
2328 object
2329 .get("status")
2330 .and_then(Value::as_str)
2331 .unwrap_or_default(),
2332 ),
2333 }),
2334 Some("mcpToolCall") => {
2335 let error = object
2336 .get("error")
2337 .and_then(Value::as_object)
2338 .and_then(|error| error.get("message"))
2339 .and_then(Value::as_str)
2340 .map(|message| ThreadError {
2341 message: message.to_string(),
2342 });
2343
2344 ThreadItem::McpToolCall(McpToolCallItem {
2345 id: id.unwrap_or_default(),
2346 server: object
2347 .get("server")
2348 .and_then(Value::as_str)
2349 .unwrap_or_default()
2350 .to_string(),
2351 tool: object
2352 .get("tool")
2353 .and_then(Value::as_str)
2354 .unwrap_or_default()
2355 .to_string(),
2356 arguments: object.get("arguments").cloned().unwrap_or(Value::Null),
2357 result: object.get("result").cloned(),
2358 error,
2359 status: parse_mcp_tool_call_status(
2360 object
2361 .get("status")
2362 .and_then(Value::as_str)
2363 .unwrap_or_default(),
2364 ),
2365 })
2366 }
2367 Some("dynamicToolCall") => ThreadItem::DynamicToolCall(DynamicToolCallItem {
2368 id: id.unwrap_or_default(),
2369 tool: object
2370 .get("tool")
2371 .and_then(Value::as_str)
2372 .unwrap_or_default()
2373 .to_string(),
2374 arguments: object.get("arguments").cloned().unwrap_or(Value::Null),
2375 status: object
2376 .get("status")
2377 .and_then(Value::as_str)
2378 .unwrap_or_default()
2379 .to_string(),
2380 content_items: object
2381 .get("contentItems")
2382 .and_then(Value::as_array)
2383 .cloned()
2384 .unwrap_or_default(),
2385 success: object.get("success").and_then(Value::as_bool),
2386 duration_ms: object.get("durationMs").and_then(Value::as_u64),
2387 }),
2388 Some("collabToolCall") => ThreadItem::CollabToolCall(CollabToolCallItem {
2389 id: id.unwrap_or_default(),
2390 tool: object
2391 .get("tool")
2392 .and_then(Value::as_str)
2393 .unwrap_or_default()
2394 .to_string(),
2395 status: object
2396 .get("status")
2397 .and_then(Value::as_str)
2398 .unwrap_or_default()
2399 .to_string(),
2400 sender_thread_id: object
2401 .get("senderThreadId")
2402 .and_then(Value::as_str)
2403 .unwrap_or_default()
2404 .to_string(),
2405 receiver_thread_id: object
2406 .get("receiverThreadId")
2407 .and_then(Value::as_str)
2408 .map(str::to_string),
2409 new_thread_id: object
2410 .get("newThreadId")
2411 .and_then(Value::as_str)
2412 .map(str::to_string),
2413 prompt: object
2414 .get("prompt")
2415 .and_then(Value::as_str)
2416 .map(str::to_string),
2417 agent_status: object
2418 .get("agentStatus")
2419 .and_then(Value::as_str)
2420 .map(str::to_string),
2421 }),
2422 Some("webSearch") => ThreadItem::WebSearch(WebSearchItem {
2423 id: id.unwrap_or_default(),
2424 query: object
2425 .get("query")
2426 .and_then(Value::as_str)
2427 .unwrap_or_default()
2428 .to_string(),
2429 }),
2430 Some("imageView") => ThreadItem::ImageView(ImageViewItem {
2431 id: id.unwrap_or_default(),
2432 path: object
2433 .get("path")
2434 .and_then(Value::as_str)
2435 .unwrap_or_default()
2436 .to_string(),
2437 }),
2438 Some("enteredReviewMode") => ThreadItem::EnteredReviewMode(ReviewModeItem {
2439 id: id.unwrap_or_default(),
2440 review: object
2441 .get("review")
2442 .and_then(Value::as_str)
2443 .unwrap_or_default()
2444 .to_string(),
2445 }),
2446 Some("exitedReviewMode") => ThreadItem::ExitedReviewMode(ReviewModeItem {
2447 id: id.unwrap_or_default(),
2448 review: object
2449 .get("review")
2450 .and_then(Value::as_str)
2451 .unwrap_or_default()
2452 .to_string(),
2453 }),
2454 Some("contextCompaction") => ThreadItem::ContextCompaction(ContextCompactionItem {
2455 id: id.unwrap_or_default(),
2456 }),
2457 Some("todoList") => ThreadItem::TodoList(TodoListItem {
2458 id: id.unwrap_or_default(),
2459 items: object
2460 .get("items")
2461 .and_then(Value::as_array)
2462 .map(|items| items.iter().filter_map(parse_todo_item).collect())
2463 .unwrap_or_default(),
2464 }),
2465 Some("error") => ThreadItem::Error(ErrorItem {
2466 id: id.unwrap_or_default(),
2467 message: object
2468 .get("message")
2469 .and_then(Value::as_str)
2470 .unwrap_or_default()
2471 .to_string(),
2472 }),
2473 _ => ThreadItem::Unknown(UnknownItem {
2474 id,
2475 item_type,
2476 raw: item,
2477 }),
2478 }
2479}
2480
2481fn parse_agent_message_phase(value: &str) -> AgentMessagePhase {
2482 match value {
2483 "commentary" => AgentMessagePhase::Commentary,
2484 "final_answer" => AgentMessagePhase::FinalAnswer,
2485 _ => AgentMessagePhase::Unknown,
2486 }
2487}
2488
2489fn parse_user_message_content_item(value: &Value) -> UserMessageContentItem {
2490 let Some(object) = value.as_object() else {
2491 return UserMessageContentItem::Unknown(value.clone());
2492 };
2493
2494 match object.get("type").and_then(Value::as_str) {
2495 Some("text") => UserMessageContentItem::Text {
2496 text: object
2497 .get("text")
2498 .and_then(Value::as_str)
2499 .unwrap_or_default()
2500 .to_string(),
2501 },
2502 Some("image") => UserMessageContentItem::Image {
2503 url: object
2504 .get("url")
2505 .and_then(Value::as_str)
2506 .unwrap_or_default()
2507 .to_string(),
2508 },
2509 Some("localImage") => UserMessageContentItem::LocalImage {
2510 path: object
2511 .get("path")
2512 .and_then(Value::as_str)
2513 .unwrap_or_default()
2514 .to_string(),
2515 },
2516 _ => UserMessageContentItem::Unknown(value.clone()),
2517 }
2518}
2519
2520fn parse_reasoning_text(object: &Map<String, Value>) -> String {
2521 if let Some(text) = object.get("text").and_then(Value::as_str) {
2522 return text.to_string();
2523 }
2524
2525 let summary = object
2526 .get("summary")
2527 .and_then(Value::as_array)
2528 .map(|entries| {
2529 entries
2530 .iter()
2531 .filter_map(Value::as_str)
2532 .collect::<Vec<_>>()
2533 .join("\n")
2534 })
2535 .unwrap_or_default();
2536
2537 let content = object
2538 .get("content")
2539 .and_then(Value::as_array)
2540 .map(|entries| {
2541 entries
2542 .iter()
2543 .filter_map(Value::as_str)
2544 .collect::<Vec<_>>()
2545 .join("\n")
2546 })
2547 .unwrap_or_default();
2548
2549 if summary.is_empty() {
2550 content
2551 } else if content.is_empty() {
2552 summary
2553 } else {
2554 format!("{summary}\n{content}")
2555 }
2556}
2557
2558fn parse_file_update_change(change: &Value) -> Option<FileUpdateChange> {
2559 let object = change.as_object()?;
2560 let kind = match object.get("kind") {
2561 Some(Value::String(kind)) => parse_patch_change_kind(kind),
2562 Some(Value::Object(kind_object)) => kind_object
2563 .get("type")
2564 .and_then(Value::as_str)
2565 .map(parse_patch_change_kind)
2566 .unwrap_or(PatchChangeKind::Unknown),
2567 _ => PatchChangeKind::Unknown,
2568 };
2569
2570 Some(FileUpdateChange {
2571 path: object
2572 .get("path")
2573 .and_then(Value::as_str)
2574 .unwrap_or_default()
2575 .to_string(),
2576 kind,
2577 })
2578}
2579
2580fn parse_todo_item(value: &Value) -> Option<TodoItem> {
2581 let object = value.as_object()?;
2582 Some(TodoItem {
2583 text: object
2584 .get("text")
2585 .and_then(Value::as_str)
2586 .unwrap_or_default()
2587 .to_string(),
2588 completed: object
2589 .get("completed")
2590 .and_then(Value::as_bool)
2591 .unwrap_or(false),
2592 })
2593}
2594
2595fn parse_command_execution_status(status: &str) -> CommandExecutionStatus {
2596 match status {
2597 "inProgress" | "in_progress" => CommandExecutionStatus::InProgress,
2598 "completed" => CommandExecutionStatus::Completed,
2599 "failed" => CommandExecutionStatus::Failed,
2600 "declined" => CommandExecutionStatus::Declined,
2601 _ => CommandExecutionStatus::Unknown,
2602 }
2603}
2604
2605fn parse_patch_change_kind(kind: &str) -> PatchChangeKind {
2606 match kind {
2607 "add" => PatchChangeKind::Add,
2608 "delete" => PatchChangeKind::Delete,
2609 "update" => PatchChangeKind::Update,
2610 _ => PatchChangeKind::Unknown,
2611 }
2612}
2613
2614fn parse_patch_apply_status(status: &str) -> PatchApplyStatus {
2615 match status {
2616 "inProgress" | "in_progress" => PatchApplyStatus::InProgress,
2617 "completed" => PatchApplyStatus::Completed,
2618 "failed" => PatchApplyStatus::Failed,
2619 "declined" => PatchApplyStatus::Declined,
2620 _ => PatchApplyStatus::Unknown,
2621 }
2622}
2623
2624fn parse_mcp_tool_call_status(status: &str) -> McpToolCallStatus {
2625 match status {
2626 "inProgress" | "in_progress" => McpToolCallStatus::InProgress,
2627 "completed" => McpToolCallStatus::Completed,
2628 "failed" => McpToolCallStatus::Failed,
2629 _ => McpToolCallStatus::Unknown,
2630 }
2631}
2632
2633#[cfg(test)]
2634mod tests {
2635 use super::*;
2636 use schemars::JsonSchema;
2637 use serde::{Deserialize, Serialize};
2638 use serde_json::json;
2639
2640 #[derive(
2641 Debug,
2642 Clone,
2643 PartialEq,
2644 Eq,
2645 Serialize,
2646 Deserialize,
2647 JsonSchema,
2648 codex_app_server_sdk::OpenAiSerializable,
2649 )]
2650 struct StructuredReply {
2651 answer: String,
2652 }
2653
2654 fn thread_summary(
2655 id: &str,
2656 cwd: Option<&str>,
2657 updated_at: Option<i64>,
2658 created_at: Option<i64>,
2659 ) -> responses::ThreadSummary {
2660 let mut extra = Map::new();
2661 if let Some(cwd) = cwd {
2662 extra.insert("cwd".to_string(), Value::String(cwd.to_string()));
2663 }
2664 if let Some(updated_at) = updated_at {
2665 extra.insert("updatedAt".to_string(), Value::from(updated_at));
2666 }
2667 if let Some(created_at) = created_at {
2668 extra.insert("createdAt".to_string(), Value::from(created_at));
2669 }
2670
2671 responses::ThreadSummary {
2672 id: id.to_string(),
2673 extra,
2674 ..Default::default()
2675 }
2676 }
2677
2678 #[test]
2679 fn normalize_input_combines_text_and_images() {
2680 let normalized = normalize_input(Input::Items(vec![
2681 UserInput::Text {
2682 text: "first".to_string(),
2683 },
2684 UserInput::Text {
2685 text: "second".to_string(),
2686 },
2687 UserInput::LocalImage {
2688 path: "/tmp/one.png".to_string(),
2689 },
2690 UserInput::LocalImage {
2691 path: "/tmp/two.png".to_string(),
2692 },
2693 ]));
2694
2695 assert_eq!(normalized.len(), 3);
2696 match &normalized[0] {
2697 requests::TurnInputItem::Text { text } => assert_eq!(text, "first\n\nsecond"),
2698 other => panic!("expected text input item, got {other:?}"),
2699 }
2700 match &normalized[1] {
2701 requests::TurnInputItem::LocalImage { path } => assert_eq!(path, "/tmp/one.png"),
2702 other => panic!("expected local image input item, got {other:?}"),
2703 }
2704 match &normalized[2] {
2705 requests::TurnInputItem::LocalImage { path } => assert_eq!(path, "/tmp/two.png"),
2706 other => panic!("expected local image input item, got {other:?}"),
2707 }
2708 }
2709
2710 #[test]
2711 fn parse_agent_message_item() {
2712 let item = parse_thread_item(json!({
2713 "id": "item_1",
2714 "type": "agentMessage",
2715 "text": "hello",
2716 "phase": "final_answer"
2717 }));
2718
2719 assert_eq!(
2720 item,
2721 ThreadItem::AgentMessage(AgentMessageItem {
2722 id: "item_1".to_string(),
2723 text: "hello".to_string(),
2724 phase: Some(AgentMessagePhase::FinalAnswer),
2725 })
2726 );
2727 }
2728
2729 #[test]
2730 fn parse_missing_documented_thread_item_variants() {
2731 let cases = vec![
2732 (
2733 json!({
2734 "id": "user_1",
2735 "type": "userMessage",
2736 "content": [
2737 { "type": "text", "text": "hello" },
2738 { "type": "localImage", "path": "/tmp/example.png" }
2739 ]
2740 }),
2741 ThreadItem::UserMessage(UserMessageItem {
2742 id: "user_1".to_string(),
2743 content: vec![
2744 UserMessageContentItem::Text {
2745 text: "hello".to_string(),
2746 },
2747 UserMessageContentItem::LocalImage {
2748 path: "/tmp/example.png".to_string(),
2749 },
2750 ],
2751 }),
2752 ),
2753 (
2754 json!({
2755 "id": "plan_1",
2756 "type": "plan",
2757 "text": "1. inspect\n2. patch"
2758 }),
2759 ThreadItem::Plan(PlanItem {
2760 id: "plan_1".to_string(),
2761 text: "1. inspect\n2. patch".to_string(),
2762 }),
2763 ),
2764 (
2765 json!({
2766 "id": "tool_1",
2767 "type": "dynamicToolCall",
2768 "tool": "tool/search",
2769 "arguments": { "q": "rust" },
2770 "status": "completed",
2771 "contentItems": [{ "type": "text", "text": "done" }],
2772 "success": true,
2773 "durationMs": 12
2774 }),
2775 ThreadItem::DynamicToolCall(DynamicToolCallItem {
2776 id: "tool_1".to_string(),
2777 tool: "tool/search".to_string(),
2778 arguments: json!({ "q": "rust" }),
2779 status: "completed".to_string(),
2780 content_items: vec![json!({ "type": "text", "text": "done" })],
2781 success: Some(true),
2782 duration_ms: Some(12),
2783 }),
2784 ),
2785 (
2786 json!({
2787 "id": "collab_1",
2788 "type": "collabToolCall",
2789 "tool": "delegate",
2790 "status": "completed",
2791 "senderThreadId": "thr_a",
2792 "receiverThreadId": "thr_b",
2793 "newThreadId": "thr_c",
2794 "prompt": "review this",
2795 "agentStatus": "idle"
2796 }),
2797 ThreadItem::CollabToolCall(CollabToolCallItem {
2798 id: "collab_1".to_string(),
2799 tool: "delegate".to_string(),
2800 status: "completed".to_string(),
2801 sender_thread_id: "thr_a".to_string(),
2802 receiver_thread_id: Some("thr_b".to_string()),
2803 new_thread_id: Some("thr_c".to_string()),
2804 prompt: Some("review this".to_string()),
2805 agent_status: Some("idle".to_string()),
2806 }),
2807 ),
2808 (
2809 json!({
2810 "id": "image_1",
2811 "type": "imageView",
2812 "path": "/tmp/example.jpg"
2813 }),
2814 ThreadItem::ImageView(ImageViewItem {
2815 id: "image_1".to_string(),
2816 path: "/tmp/example.jpg".to_string(),
2817 }),
2818 ),
2819 (
2820 json!({
2821 "id": "review_1",
2822 "type": "enteredReviewMode",
2823 "review": "current changes"
2824 }),
2825 ThreadItem::EnteredReviewMode(ReviewModeItem {
2826 id: "review_1".to_string(),
2827 review: "current changes".to_string(),
2828 }),
2829 ),
2830 (
2831 json!({
2832 "id": "review_2",
2833 "type": "exitedReviewMode",
2834 "review": "looks good"
2835 }),
2836 ThreadItem::ExitedReviewMode(ReviewModeItem {
2837 id: "review_2".to_string(),
2838 review: "looks good".to_string(),
2839 }),
2840 ),
2841 (
2842 json!({
2843 "id": "compact_1",
2844 "type": "contextCompaction"
2845 }),
2846 ThreadItem::ContextCompaction(ContextCompactionItem {
2847 id: "compact_1".to_string(),
2848 }),
2849 ),
2850 ];
2851
2852 for (raw, expected) in cases {
2853 assert_eq!(parse_thread_item(raw), expected);
2854 }
2855 }
2856
2857 #[test]
2858 fn final_response_prefers_final_answer_phase() {
2859 let mut final_answer = None;
2860 let mut fallback_response = None;
2861
2862 update_final_response_candidates(
2863 &ThreadItem::AgentMessage(AgentMessageItem {
2864 id: "msg_1".to_string(),
2865 text: "thinking".to_string(),
2866 phase: Some(AgentMessagePhase::Commentary),
2867 }),
2868 &mut final_answer,
2869 &mut fallback_response,
2870 );
2871 update_final_response_candidates(
2872 &ThreadItem::AgentMessage(AgentMessageItem {
2873 id: "msg_2".to_string(),
2874 text: "final".to_string(),
2875 phase: Some(AgentMessagePhase::FinalAnswer),
2876 }),
2877 &mut final_answer,
2878 &mut fallback_response,
2879 );
2880
2881 assert_eq!(fallback_response.as_deref(), Some("thinking"));
2882 assert_eq!(final_answer.as_deref(), Some("final"));
2883 assert_eq!(
2884 final_answer.or(fallback_response),
2885 Some("final".to_string())
2886 );
2887 }
2888
2889 #[test]
2890 fn parse_usage_from_token_usage_payload() {
2891 let usage = parse_usage_from_value(&json!({
2892 "last": {
2893 "inputTokens": 10,
2894 "cachedInputTokens": 2,
2895 "outputTokens": 7
2896 }
2897 }));
2898
2899 assert_eq!(
2900 usage,
2901 Some(Usage {
2902 input_tokens: 10,
2903 cached_input_tokens: 2,
2904 output_tokens: 7,
2905 })
2906 );
2907 }
2908
2909 #[test]
2910 fn parse_unknown_item_preserves_payload() {
2911 let raw = json!({
2912 "id": "x",
2913 "type": "newItemType",
2914 "payload": {"a": 1}
2915 });
2916 let parsed = parse_thread_item(raw.clone());
2917
2918 match parsed {
2919 ThreadItem::Unknown(unknown) => {
2920 assert_eq!(unknown.id.as_deref(), Some("x"));
2921 assert_eq!(unknown.item_type.as_deref(), Some("newItemType"));
2922 assert_eq!(unknown.raw, raw);
2923 }
2924 other => panic!("expected unknown item, got {other:?}"),
2925 }
2926 }
2927
2928 #[test]
2929 fn select_latest_thread_id_prefers_newest_matching_working_directory() {
2930 let threads = vec![
2931 thread_summary("thread_other", Some("/tmp/other"), Some(300), None),
2932 thread_summary("thread_old", Some("/tmp/workspace"), None, Some(100)),
2933 thread_summary("thread_new", Some("/tmp/workspace"), Some(200), None),
2934 ];
2935
2936 assert_eq!(
2937 select_latest_thread_id(&threads, Some("/tmp/workspace")),
2938 Some("thread_new".to_string())
2939 );
2940 }
2941
2942 #[test]
2943 fn select_latest_thread_id_falls_back_to_first_matching_thread_without_timestamps() {
2944 let threads = vec![
2945 thread_summary("thread_other", Some("/tmp/other"), None, None),
2946 thread_summary("thread_match", Some("/tmp/workspace"), None, None),
2947 thread_summary("thread_match_two", Some("/tmp/workspace"), None, None),
2948 ];
2949
2950 assert_eq!(
2951 select_latest_thread_id(&threads, Some("/tmp/workspace")),
2952 Some("thread_match".to_string())
2953 );
2954 }
2955
2956 #[test]
2957 fn select_latest_thread_id_keeps_newest_listed_match_when_later_entry_only_has_timestamp() {
2958 let threads = vec![
2959 thread_summary("thread_newest", Some("/tmp/workspace"), None, None),
2960 thread_summary("thread_older", Some("/tmp/workspace"), Some(100), None),
2961 ];
2962
2963 assert_eq!(
2964 select_latest_thread_id(&threads, Some("/tmp/workspace")),
2965 Some("thread_newest".to_string())
2966 );
2967 }
2968
2969 #[test]
2970 fn thread_options_builder_maps_extended_protocol_fields() {
2971 let collaboration_mode = CollaborationMode::new(
2972 CollaborationModeKind::Default,
2973 CollaborationModeSettings::new("gpt-5.2-codex")
2974 .with_reasoning_effort(ModelReasoningEffort::High),
2975 );
2976
2977 let options = ThreadOptions::builder()
2978 .model("gpt-5.2-codex")
2979 .model_provider("mock_provider")
2980 .sandbox_mode(SandboxMode::WorkspaceWrite)
2981 .sandbox_policy(json!({"type": "dangerFullAccess"}))
2982 .working_directory("/tmp/workspace")
2983 .skip_git_repo_check(true)
2984 .model_reasoning_effort(ModelReasoningEffort::None)
2985 .model_reasoning_summary(ModelReasoningSummary::Auto)
2986 .network_access_enabled(true)
2987 .web_search_mode(WebSearchMode::Live)
2988 .web_search_enabled(false)
2989 .approval_policy(ApprovalMode::OnRequest)
2990 .add_directory("/tmp/one")
2991 .add_directory("/tmp/two")
2992 .personality(Personality::Pragmatic)
2993 .base_instructions("base instructions")
2994 .developer_instructions("developer instructions")
2995 .ephemeral(true)
2996 .insert_config("sandbox_workspace_write.network_access", Value::Bool(true))
2997 .dynamic_tools(vec![DynamicToolSpec::new(
2998 "demo_tool",
2999 "Demo dynamic tool",
3000 json!({"type": "object"}),
3001 )])
3002 .experimental_raw_events(true)
3003 .persist_extended_history(true)
3004 .collaboration_mode(collaboration_mode)
3005 .build();
3006
3007 let thread_params = build_thread_start_params(&options);
3008 assert_eq!(thread_params.model.as_deref(), Some("gpt-5.2-codex"));
3009 assert_eq!(
3010 thread_params.model_provider.as_deref(),
3011 Some("mock_provider")
3012 );
3013 assert_eq!(thread_params.cwd.as_deref(), Some("/tmp/workspace"));
3014 assert_eq!(thread_params.approval_policy.as_deref(), Some("on-request"));
3015 assert_eq!(thread_params.sandbox.as_deref(), Some("workspace-write"));
3016 assert_eq!(
3017 thread_params.sandbox_policy,
3018 Some(json!({"type": "dangerFullAccess"}))
3019 );
3020 assert_eq!(thread_params.effort.as_deref(), Some("none"));
3021 assert_eq!(thread_params.summary.as_deref(), Some("auto"));
3022 assert_eq!(thread_params.personality.as_deref(), Some("pragmatic"));
3023 assert_eq!(thread_params.ephemeral, Some(true));
3024 assert_eq!(
3025 thread_params.base_instructions.as_deref(),
3026 Some("base instructions")
3027 );
3028 assert_eq!(
3029 thread_params.developer_instructions.as_deref(),
3030 Some("developer instructions")
3031 );
3032 assert_eq!(
3033 thread_params.extra.get("skipGitRepoCheck"),
3034 Some(&Value::Bool(true))
3035 );
3036 assert_eq!(
3037 thread_params.extra.get("webSearchMode"),
3038 Some(&Value::String("live".to_string()))
3039 );
3040 assert_eq!(
3041 thread_params.extra.get("webSearchEnabled"),
3042 Some(&Value::Bool(false))
3043 );
3044 assert_eq!(
3045 thread_params.extra.get("networkAccessEnabled"),
3046 Some(&Value::Bool(true))
3047 );
3048 assert_eq!(
3049 thread_params.extra.get("additionalDirectories"),
3050 Some(&json!(["/tmp/one", "/tmp/two"]))
3051 );
3052 assert_eq!(
3053 thread_params.extra.get("config"),
3054 Some(&json!({"sandbox_workspace_write.network_access": true}))
3055 );
3056 assert_eq!(
3057 thread_params.extra.get("dynamicTools"),
3058 Some(&json!([{
3059 "name": "demo_tool",
3060 "description": "Demo dynamic tool",
3061 "inputSchema": {"type": "object"}
3062 }]))
3063 );
3064 assert_eq!(
3065 thread_params.extra.get("experimentalRawEvents"),
3066 Some(&Value::Bool(true))
3067 );
3068 assert_eq!(
3069 thread_params.extra.get("persistExtendedHistory"),
3070 Some(&Value::Bool(true))
3071 );
3072
3073 let resume_params = build_thread_resume_params("thread_123", &options);
3074 assert_eq!(resume_params.thread_id, "thread_123");
3075 assert_eq!(resume_params.model.as_deref(), Some("gpt-5.2-codex"));
3076 assert_eq!(
3077 resume_params.model_provider.as_deref(),
3078 Some("mock_provider")
3079 );
3080 assert_eq!(resume_params.cwd.as_deref(), Some("/tmp/workspace"));
3081 assert_eq!(resume_params.approval_policy.as_deref(), Some("on-request"));
3082 assert_eq!(resume_params.sandbox.as_deref(), Some("workspace-write"));
3083 assert_eq!(resume_params.personality.as_deref(), Some("pragmatic"));
3084 assert_eq!(
3085 resume_params
3086 .config
3087 .as_ref()
3088 .and_then(|config| config.get("sandbox_workspace_write.network_access")),
3089 Some(&Value::Bool(true))
3090 );
3091 assert_eq!(resume_params.persist_extended_history, Some(true));
3092 assert_eq!(
3093 resume_params.extra.get("sandboxPolicy"),
3094 Some(&json!({"type": "dangerFullAccess"}))
3095 );
3096 assert_eq!(
3097 resume_params.extra.get("effort"),
3098 Some(&Value::String("none".to_string()))
3099 );
3100 assert_eq!(
3101 resume_params.extra.get("summary"),
3102 Some(&Value::String("auto".to_string()))
3103 );
3104 assert_eq!(
3105 resume_params.extra.get("experimentalRawEvents"),
3106 Some(&Value::Bool(true))
3107 );
3108
3109 let turn_params = build_turn_start_params(
3110 "thread_123",
3111 Input::text("hello"),
3112 &options,
3113 &TurnOptions::default(),
3114 );
3115 assert_eq!(turn_params.model_provider.as_deref(), Some("mock_provider"));
3116 assert_eq!(turn_params.effort.as_deref(), Some("none"));
3117 assert_eq!(turn_params.summary.as_deref(), Some("auto"));
3118 assert_eq!(turn_params.personality.as_deref(), Some("pragmatic"));
3119 assert_eq!(
3120 turn_params.sandbox_policy,
3121 Some(json!({"type": "dangerFullAccess"}))
3122 );
3123 assert_eq!(
3124 turn_params.extra.get("collaborationMode"),
3125 Some(&json!({
3126 "mode": "default",
3127 "settings": {
3128 "model": "gpt-5.2-codex",
3129 "reasoning_effort": "high"
3130 }
3131 }))
3132 );
3133 }
3134
3135 #[test]
3136 fn thread_options_builder_skip_git_repo_check_matches_cli_flag_semantics() {
3137 let enabled = ThreadOptions::builder().skip_git_repo_check(true).build();
3138 let enabled_params = build_thread_start_params(&enabled);
3139 assert_eq!(
3140 enabled_params.extra.get("skipGitRepoCheck"),
3141 Some(&Value::Bool(true))
3142 );
3143
3144 let disabled = ThreadOptions::builder().skip_git_repo_check(false).build();
3145 let disabled_params = build_thread_start_params(&disabled);
3146 assert_eq!(
3147 disabled_params.extra.get("skipGitRepoCheck"),
3148 Some(&Value::Bool(false))
3149 );
3150 }
3151
3152 #[test]
3153 fn turn_options_builder_sets_typed_output_schema() {
3154 let turn_options = TurnOptions::builder()
3155 .output_schema_for::<StructuredReply>()
3156 .build();
3157 let turn_params = build_turn_start_params(
3158 "thread_123",
3159 Input::text("hello"),
3160 &ThreadOptions::default(),
3161 &turn_options,
3162 );
3163
3164 assert_eq!(
3165 turn_params.output_schema,
3166 Some(StructuredReply::openai_output_schema())
3167 );
3168 }
3169
3170 #[test]
3171 fn turn_options_builder_clear_output_schema_overrides_previous_value() {
3172 let turn_options = TurnOptions::builder()
3173 .output_schema(json!({"type": "object"}))
3174 .clear_output_schema()
3175 .build();
3176 let turn_params = build_turn_start_params(
3177 "thread_123",
3178 Input::text("hello"),
3179 &ThreadOptions::default(),
3180 &turn_options,
3181 );
3182
3183 assert_eq!(turn_params.output_schema, None);
3184 }
3185
3186 #[test]
3187 fn turn_options_value_helpers_set_raw_and_typed_schemas() {
3188 let raw = TurnOptions::default().with_output_schema(json!({"type": "object"}));
3189 assert_eq!(raw.output_schema, Some(json!({"type": "object"})));
3190
3191 let typed = TurnOptions::default().with_output_schema_for::<StructuredReply>();
3192 assert_eq!(
3193 typed.output_schema,
3194 Some(StructuredReply::openai_output_schema())
3195 );
3196 }
3197
3198 #[test]
3199 fn turn_options_builder_overrides_thread_defaults() {
3200 let thread_options = ThreadOptions::builder()
3201 .model("gpt-5-thread-default")
3202 .model_provider("provider-thread")
3203 .working_directory("/tmp/thread")
3204 .model_reasoning_effort(ModelReasoningEffort::Low)
3205 .model_reasoning_summary(ModelReasoningSummary::Auto)
3206 .personality(Personality::Friendly)
3207 .approval_policy(ApprovalMode::OnRequest)
3208 .sandbox_policy(json!({"thread": true}))
3209 .skip_git_repo_check(false)
3210 .network_access_enabled(false)
3211 .web_search_mode(WebSearchMode::Cached)
3212 .web_search_enabled(false)
3213 .add_directory("/tmp/thread-dir")
3214 .build();
3215
3216 let turn_options = TurnOptions::builder()
3217 .model("gpt-5-turn-override")
3218 .model_provider("provider-turn")
3219 .working_directory("/tmp/turn")
3220 .model_reasoning_effort(ModelReasoningEffort::High)
3221 .model_reasoning_summary(ModelReasoningSummary::Detailed)
3222 .personality(Personality::Pragmatic)
3223 .approval_policy(ApprovalMode::Never)
3224 .sandbox_policy(json!({"turn": true}))
3225 .skip_git_repo_check(true)
3226 .network_access_enabled(true)
3227 .web_search_mode(WebSearchMode::Live)
3228 .web_search_enabled(true)
3229 .add_directory("/tmp/turn-dir")
3230 .insert_extra("customTurnFlag", Value::Bool(true))
3231 .build();
3232
3233 let params = build_turn_start_params(
3234 "thread_123",
3235 Input::text("hello"),
3236 &thread_options,
3237 &turn_options,
3238 );
3239
3240 assert_eq!(params.cwd.as_deref(), Some("/tmp/turn"));
3241 assert_eq!(params.model.as_deref(), Some("gpt-5-turn-override"));
3242 assert_eq!(params.model_provider.as_deref(), Some("provider-turn"));
3243 assert_eq!(params.effort.as_deref(), Some("high"));
3244 assert_eq!(params.summary.as_deref(), Some("detailed"));
3245 assert_eq!(params.personality.as_deref(), Some("pragmatic"));
3246 assert_eq!(params.approval_policy.as_deref(), Some("never"));
3247 assert_eq!(params.sandbox_policy, Some(json!({"turn": true})));
3248 assert_eq!(
3249 params.extra.get("skipGitRepoCheck"),
3250 Some(&Value::Bool(true))
3251 );
3252 assert_eq!(
3253 params.extra.get("networkAccessEnabled"),
3254 Some(&Value::Bool(true))
3255 );
3256 assert_eq!(
3257 params.extra.get("webSearchMode"),
3258 Some(&Value::String("live".to_string()))
3259 );
3260 assert_eq!(
3261 params.extra.get("webSearchEnabled"),
3262 Some(&Value::Bool(true))
3263 );
3264 assert_eq!(
3265 params.extra.get("additionalDirectories"),
3266 Some(&json!(["/tmp/turn-dir"]))
3267 );
3268 assert_eq!(params.extra.get("customTurnFlag"), Some(&Value::Bool(true)));
3269 }
3270}