1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3
4use serde_json::{Map, Value};
5use thiserror::Error;
6use tokio::sync::{Mutex, mpsc};
7use tokio::task::JoinHandle;
8
9use crate::CodexClient;
10use crate::client::StdioConfig;
11use crate::client::WsConfig;
12use crate::error::ClientError;
13use crate::events::{ServerEvent, ServerNotification};
14use crate::protocol::shared::EmptyObject;
15use crate::protocol::{requests, responses};
16use crate::schema::OpenAiSerializable;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum ApprovalMode {
20 Never,
21 OnRequest,
22 OnFailure,
23 Untrusted,
24}
25
26impl ApprovalMode {
27 fn as_str(self) -> &'static str {
28 match self {
29 Self::Never => "never",
30 Self::OnRequest => "on-request",
31 Self::OnFailure => "on-failure",
32 Self::Untrusted => "untrusted",
33 }
34 }
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum SandboxMode {
39 ReadOnly,
40 WorkspaceWrite,
41 DangerFullAccess,
42}
43
44impl SandboxMode {
45 fn as_str(self) -> &'static str {
46 match self {
47 Self::ReadOnly => "read-only",
48 Self::WorkspaceWrite => "workspace-write",
49 Self::DangerFullAccess => "danger-full-access",
50 }
51 }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum ModelReasoningEffort {
56 None,
57 Minimal,
58 Low,
59 Medium,
60 High,
61 XHigh,
62}
63
64impl ModelReasoningEffort {
65 fn as_str(self) -> &'static str {
66 match self {
67 Self::None => "none",
68 Self::Minimal => "minimal",
69 Self::Low => "low",
70 Self::Medium => "medium",
71 Self::High => "high",
72 Self::XHigh => "xhigh",
73 }
74 }
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum ModelReasoningSummary {
79 None,
80 Auto,
81 Concise,
82 Detailed,
83}
84
85impl ModelReasoningSummary {
86 fn as_str(self) -> &'static str {
87 match self {
88 Self::None => "none",
89 Self::Auto => "auto",
90 Self::Concise => "concise",
91 Self::Detailed => "detailed",
92 }
93 }
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum Personality {
98 None,
99 Friendly,
100 Pragmatic,
101}
102
103impl Personality {
104 fn as_str(self) -> &'static str {
105 match self {
106 Self::None => "none",
107 Self::Friendly => "friendly",
108 Self::Pragmatic => "pragmatic",
109 }
110 }
111}
112
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum WebSearchMode {
115 Disabled,
116 Cached,
117 Live,
118}
119
120impl WebSearchMode {
121 fn as_str(self) -> &'static str {
122 match self {
123 Self::Disabled => "disabled",
124 Self::Cached => "cached",
125 Self::Live => "live",
126 }
127 }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum CollaborationModeKind {
132 Plan,
133 Default,
134}
135
136impl CollaborationModeKind {
137 fn as_str(self) -> &'static str {
138 match self {
139 Self::Plan => "plan",
140 Self::Default => "default",
141 }
142 }
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct CollaborationModeSettings {
147 pub model: String,
148 pub reasoning_effort: Option<ModelReasoningEffort>,
149 pub developer_instructions: Option<String>,
150}
151
152impl CollaborationModeSettings {
153 pub fn new(model: impl Into<String>) -> Self {
154 Self {
155 model: model.into(),
156 reasoning_effort: None,
157 developer_instructions: None,
158 }
159 }
160
161 pub fn with_reasoning_effort(mut self, reasoning_effort: ModelReasoningEffort) -> Self {
162 self.reasoning_effort = Some(reasoning_effort);
163 self
164 }
165
166 pub fn with_developer_instructions(
167 mut self,
168 developer_instructions: impl Into<String>,
169 ) -> Self {
170 self.developer_instructions = Some(developer_instructions.into());
171 self
172 }
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
176pub struct CollaborationMode {
177 pub mode: CollaborationModeKind,
178 pub settings: CollaborationModeSettings,
179}
180
181impl CollaborationMode {
182 pub fn new(mode: CollaborationModeKind, settings: CollaborationModeSettings) -> Self {
183 Self { mode, settings }
184 }
185
186 fn as_value(&self) -> Value {
187 let mut settings = Map::new();
188 settings.insert(
189 "model".to_string(),
190 Value::String(self.settings.model.clone()),
191 );
192 if let Some(reasoning_effort) = self.settings.reasoning_effort {
193 settings.insert(
194 "reasoning_effort".to_string(),
195 Value::String(reasoning_effort.as_str().to_string()),
196 );
197 }
198 if let Some(instructions) = &self.settings.developer_instructions {
199 settings.insert(
200 "developer_instructions".to_string(),
201 Value::String(instructions.clone()),
202 );
203 }
204
205 let mut value = Map::new();
206 value.insert(
207 "mode".to_string(),
208 Value::String(self.mode.as_str().to_string()),
209 );
210 value.insert("settings".to_string(), Value::Object(settings));
211 Value::Object(value)
212 }
213}
214
215#[derive(Debug, Clone, PartialEq)]
216pub struct DynamicToolSpec {
217 pub name: String,
218 pub description: String,
219 pub input_schema: Value,
220}
221
222impl DynamicToolSpec {
223 pub fn new(
224 name: impl Into<String>,
225 description: impl Into<String>,
226 input_schema: Value,
227 ) -> Self {
228 Self {
229 name: name.into(),
230 description: description.into(),
231 input_schema,
232 }
233 }
234
235 fn as_value(&self) -> Value {
236 let mut value = Map::new();
237 value.insert("name".to_string(), Value::String(self.name.clone()));
238 value.insert(
239 "description".to_string(),
240 Value::String(self.description.clone()),
241 );
242 value.insert("inputSchema".to_string(), self.input_schema.clone());
243 Value::Object(value)
244 }
245}
246
247#[derive(Debug, Clone, Default)]
248pub struct ThreadOptions {
249 pub model: Option<String>,
250 pub model_provider: Option<String>,
251 pub sandbox_mode: Option<SandboxMode>,
252 pub sandbox_policy: Option<Value>,
253 pub working_directory: Option<String>,
254 pub skip_git_repo_check: Option<bool>,
255 pub model_reasoning_effort: Option<ModelReasoningEffort>,
256 pub model_reasoning_summary: Option<ModelReasoningSummary>,
257 pub network_access_enabled: Option<bool>,
258 pub web_search_mode: Option<WebSearchMode>,
259 pub web_search_enabled: Option<bool>,
260 pub approval_policy: Option<ApprovalMode>,
261 pub additional_directories: Option<Vec<String>>,
262 pub personality: Option<Personality>,
263 pub base_instructions: Option<String>,
264 pub developer_instructions: Option<String>,
265 pub ephemeral: Option<bool>,
266 pub collaboration_mode: Option<CollaborationMode>,
267 pub config: Option<Map<String, Value>>,
268 pub dynamic_tools: Option<Vec<DynamicToolSpec>>,
269 pub experimental_raw_events: Option<bool>,
270 pub persist_extended_history: Option<bool>,
271}
272
273impl ThreadOptions {
274 pub fn builder() -> ThreadOptionsBuilder {
275 ThreadOptionsBuilder::new()
276 }
277}
278
279#[derive(Debug, Clone, Default)]
280pub struct ThreadOptionsBuilder {
281 options: ThreadOptions,
282}
283
284impl ThreadOptionsBuilder {
285 pub fn new() -> Self {
286 Self::default()
287 }
288
289 pub fn build(self) -> ThreadOptions {
290 self.options
291 }
292
293 pub fn model(mut self, model: impl Into<String>) -> Self {
294 self.options.model = Some(model.into());
295 self
296 }
297
298 pub fn model_provider(mut self, model_provider: impl Into<String>) -> Self {
299 self.options.model_provider = Some(model_provider.into());
300 self
301 }
302
303 pub fn sandbox_mode(mut self, sandbox_mode: SandboxMode) -> Self {
304 self.options.sandbox_mode = Some(sandbox_mode);
305 self
306 }
307
308 pub fn sandbox_policy(mut self, sandbox_policy: Value) -> Self {
309 self.options.sandbox_policy = Some(sandbox_policy);
310 self
311 }
312
313 pub fn working_directory(mut self, working_directory: impl Into<String>) -> Self {
314 self.options.working_directory = Some(working_directory.into());
315 self
316 }
317
318 pub fn skip_git_repo_check(mut self, enabled: bool) -> Self {
319 self.options.skip_git_repo_check = Some(enabled);
320 self
321 }
322
323 pub fn model_reasoning_effort(mut self, model_reasoning_effort: ModelReasoningEffort) -> Self {
324 self.options.model_reasoning_effort = Some(model_reasoning_effort);
325 self
326 }
327
328 pub fn model_reasoning_summary(
329 mut self,
330 model_reasoning_summary: ModelReasoningSummary,
331 ) -> Self {
332 self.options.model_reasoning_summary = Some(model_reasoning_summary);
333 self
334 }
335
336 pub fn network_access_enabled(mut self, enabled: bool) -> Self {
337 self.options.network_access_enabled = Some(enabled);
338 self
339 }
340
341 pub fn web_search_mode(mut self, web_search_mode: WebSearchMode) -> Self {
342 self.options.web_search_mode = Some(web_search_mode);
343 self
344 }
345
346 pub fn web_search_enabled(mut self, enabled: bool) -> Self {
347 self.options.web_search_enabled = Some(enabled);
348 self
349 }
350
351 pub fn approval_policy(mut self, approval_policy: ApprovalMode) -> Self {
352 self.options.approval_policy = Some(approval_policy);
353 self
354 }
355
356 pub fn additional_directories(mut self, additional_directories: Vec<String>) -> Self {
357 self.options.additional_directories = Some(additional_directories);
358 self
359 }
360
361 pub fn add_directory(mut self, directory: impl Into<String>) -> Self {
362 self.options
363 .additional_directories
364 .get_or_insert_with(Vec::new)
365 .push(directory.into());
366 self
367 }
368
369 pub fn personality(mut self, personality: Personality) -> Self {
370 self.options.personality = Some(personality);
371 self
372 }
373
374 pub fn base_instructions(mut self, base_instructions: impl Into<String>) -> Self {
375 self.options.base_instructions = Some(base_instructions.into());
376 self
377 }
378
379 pub fn developer_instructions(mut self, developer_instructions: impl Into<String>) -> Self {
380 self.options.developer_instructions = Some(developer_instructions.into());
381 self
382 }
383
384 pub fn ephemeral(mut self, ephemeral: bool) -> Self {
385 self.options.ephemeral = Some(ephemeral);
386 self
387 }
388
389 pub fn collaboration_mode(mut self, collaboration_mode: CollaborationMode) -> Self {
390 self.options.collaboration_mode = Some(collaboration_mode);
391 self
392 }
393
394 pub fn config(mut self, config: Map<String, Value>) -> Self {
395 self.options.config = Some(config);
396 self
397 }
398
399 pub fn insert_config(mut self, key: impl Into<String>, value: Value) -> Self {
400 self.options
401 .config
402 .get_or_insert_with(Map::new)
403 .insert(key.into(), value);
404 self
405 }
406
407 pub fn dynamic_tools(mut self, dynamic_tools: Vec<DynamicToolSpec>) -> Self {
408 self.options.dynamic_tools = Some(dynamic_tools);
409 self
410 }
411
412 pub fn experimental_raw_events(mut self, enabled: bool) -> Self {
413 self.options.experimental_raw_events = Some(enabled);
414 self
415 }
416
417 pub fn persist_extended_history(mut self, enabled: bool) -> Self {
418 self.options.persist_extended_history = Some(enabled);
419 self
420 }
421}
422
423#[derive(Debug, Clone, Default)]
424pub struct TurnOptions {
425 pub output_schema: Option<Value>,
426 pub working_directory: Option<String>,
427 pub model: Option<String>,
428 pub model_provider: Option<String>,
429 pub model_reasoning_effort: Option<ModelReasoningEffort>,
430 pub model_reasoning_summary: Option<ModelReasoningSummary>,
431 pub personality: Option<Personality>,
432 pub approval_policy: Option<ApprovalMode>,
433 pub sandbox_policy: Option<Value>,
434 pub collaboration_mode: Option<CollaborationMode>,
435 pub skip_git_repo_check: Option<bool>,
436 pub web_search_mode: Option<WebSearchMode>,
437 pub web_search_enabled: Option<bool>,
438 pub network_access_enabled: Option<bool>,
439 pub additional_directories: Option<Vec<String>>,
440 pub extra: Option<Map<String, Value>>,
441}
442
443impl TurnOptions {
444 pub fn builder() -> TurnOptionsBuilder {
445 TurnOptionsBuilder::new()
446 }
447
448 pub fn with_output_schema(mut self, output_schema: Value) -> Self {
449 self.output_schema = Some(output_schema);
450 self
451 }
452
453 pub fn with_output_schema_for<T: OpenAiSerializable>(mut self) -> Self {
454 self.output_schema = Some(T::openai_output_schema());
455 self
456 }
457
458 pub fn with_model(mut self, model: impl Into<String>) -> Self {
459 self.model = Some(model.into());
460 self
461 }
462
463 pub fn with_working_directory(mut self, working_directory: impl Into<String>) -> Self {
464 self.working_directory = Some(working_directory.into());
465 self
466 }
467}
468
469#[derive(Debug, Clone, Default)]
470pub struct TurnOptionsBuilder {
471 options: TurnOptions,
472}
473
474impl TurnOptionsBuilder {
475 pub fn new() -> Self {
476 Self::default()
477 }
478
479 pub fn build(self) -> TurnOptions {
480 self.options
481 }
482
483 pub fn output_schema(mut self, output_schema: Value) -> Self {
484 self.options.output_schema = Some(output_schema);
485 self
486 }
487
488 pub fn output_schema_for<T: OpenAiSerializable>(mut self) -> Self {
489 self.options.output_schema = Some(T::openai_output_schema());
490 self
491 }
492
493 pub fn clear_output_schema(mut self) -> Self {
494 self.options.output_schema = None;
495 self
496 }
497
498 pub fn working_directory(mut self, working_directory: impl Into<String>) -> Self {
499 self.options.working_directory = Some(working_directory.into());
500 self
501 }
502
503 pub fn model(mut self, model: impl Into<String>) -> Self {
504 self.options.model = Some(model.into());
505 self
506 }
507
508 pub fn model_provider(mut self, model_provider: impl Into<String>) -> Self {
509 self.options.model_provider = Some(model_provider.into());
510 self
511 }
512
513 pub fn model_reasoning_effort(mut self, model_reasoning_effort: ModelReasoningEffort) -> Self {
514 self.options.model_reasoning_effort = Some(model_reasoning_effort);
515 self
516 }
517
518 pub fn model_reasoning_summary(
519 mut self,
520 model_reasoning_summary: ModelReasoningSummary,
521 ) -> Self {
522 self.options.model_reasoning_summary = Some(model_reasoning_summary);
523 self
524 }
525
526 pub fn personality(mut self, personality: Personality) -> Self {
527 self.options.personality = Some(personality);
528 self
529 }
530
531 pub fn approval_policy(mut self, approval_policy: ApprovalMode) -> Self {
532 self.options.approval_policy = Some(approval_policy);
533 self
534 }
535
536 pub fn sandbox_policy(mut self, sandbox_policy: Value) -> Self {
537 self.options.sandbox_policy = Some(sandbox_policy);
538 self
539 }
540
541 pub fn collaboration_mode(mut self, collaboration_mode: CollaborationMode) -> Self {
542 self.options.collaboration_mode = Some(collaboration_mode);
543 self
544 }
545
546 pub fn skip_git_repo_check(mut self, enabled: bool) -> Self {
547 self.options.skip_git_repo_check = Some(enabled);
548 self
549 }
550
551 pub fn web_search_mode(mut self, web_search_mode: WebSearchMode) -> Self {
552 self.options.web_search_mode = Some(web_search_mode);
553 self
554 }
555
556 pub fn web_search_enabled(mut self, enabled: bool) -> Self {
557 self.options.web_search_enabled = Some(enabled);
558 self
559 }
560
561 pub fn network_access_enabled(mut self, enabled: bool) -> Self {
562 self.options.network_access_enabled = Some(enabled);
563 self
564 }
565
566 pub fn additional_directories(mut self, additional_directories: Vec<String>) -> Self {
567 self.options.additional_directories = Some(additional_directories);
568 self
569 }
570
571 pub fn add_directory(mut self, directory: impl Into<String>) -> Self {
572 self.options
573 .additional_directories
574 .get_or_insert_with(Vec::new)
575 .push(directory.into());
576 self
577 }
578
579 pub fn extra(mut self, extra: Map<String, Value>) -> Self {
580 self.options.extra = Some(extra);
581 self
582 }
583
584 pub fn insert_extra(mut self, key: impl Into<String>, value: Value) -> Self {
585 self.options
586 .extra
587 .get_or_insert_with(Map::new)
588 .insert(key.into(), value);
589 self
590 }
591}
592
593#[derive(Debug, Clone, PartialEq, Eq)]
594pub struct ThreadError {
595 pub message: String,
596}
597
598#[derive(Debug, Error)]
599pub enum ThreadRunError {
600 #[error(transparent)]
601 Client(#[from] ClientError),
602 #[error("{message}")]
603 TurnFailed { message: String },
604}
605
606#[derive(Debug, Clone, PartialEq, Eq)]
607pub enum UserInput {
608 Text { text: String },
609 LocalImage { path: String },
610}
611
612#[derive(Debug, Clone, PartialEq, Eq)]
613pub enum Input {
614 Text(String),
615 Items(Vec<UserInput>),
616}
617
618impl Input {
619 pub fn text(text: impl Into<String>) -> Self {
620 Self::Text(text.into())
621 }
622
623 pub fn items(items: Vec<UserInput>) -> Self {
624 Self::Items(items)
625 }
626}
627
628impl From<String> for Input {
629 fn from(value: String) -> Self {
630 Self::Text(value)
631 }
632}
633
634impl From<&str> for Input {
635 fn from(value: &str) -> Self {
636 Self::Text(value.to_string())
637 }
638}
639
640impl From<Vec<UserInput>> for Input {
641 fn from(value: Vec<UserInput>) -> Self {
642 Self::Items(value)
643 }
644}
645
646#[derive(Debug, Clone, PartialEq, Eq)]
647pub struct Usage {
648 pub input_tokens: i64,
649 pub cached_input_tokens: i64,
650 pub output_tokens: i64,
651}
652
653#[derive(Debug, Clone, PartialEq)]
654pub struct Turn {
655 pub items: Vec<ThreadItem>,
656 pub final_response: String,
657 pub usage: Option<Usage>,
658}
659
660pub type RunResult = Turn;
661
662pub struct StreamedTurn {
663 receiver: mpsc::Receiver<Result<ThreadEvent, ClientError>>,
664 task: JoinHandle<()>,
665}
666
667impl StreamedTurn {
668 pub async fn next_event(&mut self) -> Option<Result<ThreadEvent, ClientError>> {
669 self.receiver.recv().await
670 }
671}
672
673impl Drop for StreamedTurn {
674 fn drop(&mut self) {
675 self.task.abort();
676 }
677}
678
679#[derive(Debug, Clone, PartialEq)]
680pub enum ThreadEvent {
681 ThreadStarted { thread_id: String },
682 TurnStarted,
683 TurnCompleted { usage: Option<Usage> },
684 TurnFailed { error: ThreadError },
685 ItemStarted { item: ThreadItem },
686 ItemUpdated { item: ThreadItem },
687 ItemCompleted { item: ThreadItem },
688 Error { message: String },
689}
690
691#[derive(Debug, Clone, Copy, PartialEq, Eq)]
692pub enum AgentMessagePhase {
693 Commentary,
694 FinalAnswer,
695 Unknown,
696}
697
698impl AgentMessagePhase {
699 pub fn as_str(self) -> &'static str {
700 match self {
701 Self::Commentary => "commentary",
702 Self::FinalAnswer => "final_answer",
703 Self::Unknown => "unknown",
704 }
705 }
706}
707
708#[derive(Debug, Clone, PartialEq, Eq)]
709pub struct AgentMessageItem {
710 pub id: String,
711 pub text: String,
712 pub phase: Option<AgentMessagePhase>,
713}
714
715impl AgentMessageItem {
716 pub fn is_final_answer(&self) -> bool {
717 matches!(self.phase, Some(AgentMessagePhase::FinalAnswer))
718 }
719}
720
721#[derive(Debug, Clone, PartialEq)]
722pub enum UserMessageContentItem {
723 Text { text: String },
724 Image { url: String },
725 LocalImage { path: String },
726 Unknown(Value),
727}
728
729#[derive(Debug, Clone, PartialEq)]
730pub struct UserMessageItem {
731 pub id: String,
732 pub content: Vec<UserMessageContentItem>,
733}
734
735#[derive(Debug, Clone, PartialEq, Eq)]
736pub struct PlanItem {
737 pub id: String,
738 pub text: String,
739}
740
741#[derive(Debug, Clone, PartialEq, Eq)]
742pub struct ReasoningItem {
743 pub id: String,
744 pub text: String,
745}
746
747#[derive(Debug, Clone, Copy, PartialEq, Eq)]
748pub enum CommandExecutionStatus {
749 InProgress,
750 Completed,
751 Failed,
752 Declined,
753 Unknown,
754}
755
756#[derive(Debug, Clone, PartialEq, Eq)]
757pub struct CommandExecutionItem {
758 pub id: String,
759 pub command: String,
760 pub aggregated_output: String,
761 pub exit_code: Option<i32>,
762 pub status: CommandExecutionStatus,
763}
764
765#[derive(Debug, Clone, Copy, PartialEq, Eq)]
766pub enum PatchChangeKind {
767 Add,
768 Delete,
769 Update,
770 Unknown,
771}
772
773#[derive(Debug, Clone, PartialEq, Eq)]
774pub struct FileUpdateChange {
775 pub path: String,
776 pub kind: PatchChangeKind,
777}
778
779#[derive(Debug, Clone, Copy, PartialEq, Eq)]
780pub enum PatchApplyStatus {
781 InProgress,
782 Completed,
783 Failed,
784 Declined,
785 Unknown,
786}
787
788#[derive(Debug, Clone, PartialEq, Eq)]
789pub struct FileChangeItem {
790 pub id: String,
791 pub changes: Vec<FileUpdateChange>,
792 pub status: PatchApplyStatus,
793}
794
795#[derive(Debug, Clone, Copy, PartialEq, Eq)]
796pub enum McpToolCallStatus {
797 InProgress,
798 Completed,
799 Failed,
800 Unknown,
801}
802
803#[derive(Debug, Clone, PartialEq)]
804pub struct McpToolCallItem {
805 pub id: String,
806 pub server: String,
807 pub tool: String,
808 pub arguments: Value,
809 pub result: Option<Value>,
810 pub error: Option<ThreadError>,
811 pub status: McpToolCallStatus,
812}
813
814#[derive(Debug, Clone, PartialEq)]
815pub struct DynamicToolCallItem {
816 pub id: String,
817 pub tool: String,
818 pub arguments: Value,
819 pub status: String,
820 pub content_items: Vec<Value>,
821 pub success: Option<bool>,
822 pub duration_ms: Option<u64>,
823}
824
825#[derive(Debug, Clone, PartialEq, Eq)]
826pub struct CollabToolCallItem {
827 pub id: String,
828 pub tool: String,
829 pub status: String,
830 pub sender_thread_id: String,
831 pub receiver_thread_id: Option<String>,
832 pub new_thread_id: Option<String>,
833 pub prompt: Option<String>,
834 pub agent_status: Option<String>,
835}
836
837#[derive(Debug, Clone, PartialEq, Eq)]
838pub struct WebSearchItem {
839 pub id: String,
840 pub query: String,
841}
842
843#[derive(Debug, Clone, PartialEq, Eq)]
844pub struct ImageViewItem {
845 pub id: String,
846 pub path: String,
847}
848
849#[derive(Debug, Clone, PartialEq, Eq)]
850pub struct ReviewModeItem {
851 pub id: String,
852 pub review: String,
853}
854
855#[derive(Debug, Clone, PartialEq, Eq)]
856pub struct ContextCompactionItem {
857 pub id: String,
858}
859
860#[derive(Debug, Clone, PartialEq, Eq)]
861pub struct TodoItem {
862 pub text: String,
863 pub completed: bool,
864}
865
866#[derive(Debug, Clone, PartialEq, Eq)]
867pub struct TodoListItem {
868 pub id: String,
869 pub items: Vec<TodoItem>,
870}
871
872#[derive(Debug, Clone, PartialEq, Eq)]
873pub struct ErrorItem {
874 pub id: String,
875 pub message: String,
876}
877
878#[derive(Debug, Clone, PartialEq)]
879pub struct UnknownItem {
880 pub id: Option<String>,
881 pub item_type: Option<String>,
882 pub raw: Value,
883}
884
885#[derive(Debug, Clone, PartialEq)]
886pub enum ThreadItem {
887 AgentMessage(AgentMessageItem),
888 UserMessage(UserMessageItem),
889 Plan(PlanItem),
890 Reasoning(ReasoningItem),
891 CommandExecution(CommandExecutionItem),
892 FileChange(FileChangeItem),
893 McpToolCall(McpToolCallItem),
894 DynamicToolCall(DynamicToolCallItem),
895 CollabToolCall(CollabToolCallItem),
896 WebSearch(WebSearchItem),
897 ImageView(ImageViewItem),
898 EnteredReviewMode(ReviewModeItem),
899 ExitedReviewMode(ReviewModeItem),
900 ContextCompaction(ContextCompactionItem),
901 TodoList(TodoListItem),
902 Error(ErrorItem),
903 Unknown(UnknownItem),
904}
905
906macro_rules! codex_forward_typed_method {
907 ($method:ident, $params_ty:ty, $result_ty:ty) => {
908 pub async fn $method(&self, params: $params_ty) -> Result<$result_ty, ClientError> {
909 self.ensure_initialized().await?;
910 self.inner.client.$method(params).await
911 }
912 };
913}
914
915macro_rules! codex_forward_null_method {
916 ($method:ident, $result_ty:ty) => {
917 pub async fn $method(&self) -> Result<$result_ty, ClientError> {
918 self.ensure_initialized().await?;
919 self.inner.client.$method().await
920 }
921 };
922}
923
924#[derive(Clone)]
925pub struct Codex {
926 inner: Arc<CodexInner>,
927}
928
929struct CodexInner {
930 client: CodexClient,
931 initialize_params: requests::InitializeParams,
932 initialized: AtomicBool,
933 initialize_lock: Mutex<()>,
934}
935
936impl Codex {
937 pub fn with_initialize_params(
938 client: CodexClient,
939 initialize_params: requests::InitializeParams,
940 ) -> Self {
941 Self {
942 inner: Arc::new(CodexInner {
943 client,
944 initialize_params,
945 initialized: AtomicBool::new(false),
946 initialize_lock: Mutex::new(()),
947 }),
948 }
949 }
950
951 pub fn from_client(client: CodexClient) -> Self {
952 let initialize_params = requests::InitializeParams::new(requests::ClientInfo::new(
953 "codex_sdk_rs",
954 "Codex Rust SDK",
955 env!("CARGO_PKG_VERSION"),
956 ));
957 Self::with_initialize_params(client, initialize_params)
958 }
959
960 pub async fn spawn_stdio(config: StdioConfig) -> Result<Self, ClientError> {
961 let client = CodexClient::spawn_stdio(config).await?;
962 Ok(Self::from_client(client))
963 }
964
965 pub async fn connect_ws(config: WsConfig) -> Result<Self, ClientError> {
966 let client = CodexClient::connect_ws(config).await?;
967 Ok(Self::from_client(client))
968 }
969
970 pub async fn start_and_connect_ws(config: WsConfig) -> Result<Self, ClientError> {
971 let client = CodexClient::start_and_connect_ws(config).await?;
972 Ok(Self::from_client(client))
973 }
974
975 pub fn start_thread(&self, options: ThreadOptions) -> Thread {
976 Thread {
977 codex: self.clone(),
978 id: None,
979 needs_resume: false,
980 last_turn_id: None,
981 options,
982 }
983 }
984
985 pub fn resume_thread(&self, id: impl Into<String>, options: ThreadOptions) -> Thread {
986 Thread {
987 codex: self.clone(),
988 id: Some(id.into()),
989 needs_resume: true,
990 last_turn_id: None,
991 options,
992 }
993 }
994
995 pub async fn ask(&self, input: impl Into<Input>) -> Result<String, ThreadRunError> {
997 self.ask_with_options(input, ThreadOptions::default(), TurnOptions::default())
998 .await
999 }
1000
1001 pub async fn ask_with_options(
1003 &self,
1004 input: impl Into<Input>,
1005 thread_options: ThreadOptions,
1006 turn_options: TurnOptions,
1007 ) -> Result<String, ThreadRunError> {
1008 let mut thread = self.start_thread(thread_options);
1009 thread.ask(input, turn_options).await
1010 }
1011
1012 pub async fn thread_list(
1014 &self,
1015 params: requests::ThreadListParams,
1016 ) -> Result<responses::ThreadListResult, ClientError> {
1017 self.ensure_initialized().await?;
1018 self.inner.client.thread_list(params).await
1019 }
1020
1021 codex_forward_typed_method!(
1022 thread_start,
1023 requests::ThreadStartParams,
1024 responses::ThreadResult
1025 );
1026 codex_forward_typed_method!(
1027 thread_resume,
1028 requests::ThreadResumeParams,
1029 responses::ThreadResult
1030 );
1031 codex_forward_typed_method!(
1032 thread_fork,
1033 requests::ThreadForkParams,
1034 responses::ThreadResult
1035 );
1036 codex_forward_typed_method!(
1037 thread_archive,
1038 requests::ThreadArchiveParams,
1039 responses::ThreadArchiveResult
1040 );
1041 codex_forward_typed_method!(
1042 thread_name_set,
1043 requests::ThreadSetNameParams,
1044 responses::ThreadSetNameResult
1045 );
1046 codex_forward_typed_method!(
1047 thread_unarchive,
1048 requests::ThreadUnarchiveParams,
1049 responses::ThreadUnarchiveResult
1050 );
1051 codex_forward_typed_method!(
1052 thread_compact_start,
1053 requests::ThreadCompactStartParams,
1054 responses::ThreadCompactStartResult
1055 );
1056 codex_forward_typed_method!(
1057 thread_background_terminals_clean,
1058 requests::ThreadBackgroundTerminalsCleanParams,
1059 responses::ThreadBackgroundTerminalsCleanResult
1060 );
1061 codex_forward_typed_method!(
1062 thread_rollback,
1063 requests::ThreadRollbackParams,
1064 responses::ThreadRollbackResult
1065 );
1066 codex_forward_typed_method!(
1067 thread_loaded_list,
1068 requests::ThreadLoadedListParams,
1069 responses::ThreadLoadedListResult
1070 );
1071 codex_forward_typed_method!(
1072 thread_read,
1073 requests::ThreadReadParams,
1074 responses::ThreadReadResult
1075 );
1076 codex_forward_typed_method!(
1077 skills_list,
1078 requests::SkillsListParams,
1079 responses::SkillsListResult
1080 );
1081 codex_forward_typed_method!(
1082 skills_remote_list,
1083 requests::SkillsRemoteReadParams,
1084 responses::SkillsRemoteReadResult
1085 );
1086 codex_forward_typed_method!(
1087 skills_remote_export,
1088 requests::SkillsRemoteWriteParams,
1089 responses::SkillsRemoteWriteResult
1090 );
1091 codex_forward_typed_method!(
1092 app_list,
1093 requests::AppsListParams,
1094 responses::AppsListResult
1095 );
1096 codex_forward_typed_method!(
1097 skills_config_write,
1098 requests::SkillsConfigWriteParams,
1099 responses::SkillsConfigWriteResult
1100 );
1101 codex_forward_typed_method!(turn_start, requests::TurnStartParams, responses::TurnResult);
1102 codex_forward_typed_method!(
1103 turn_steer,
1104 requests::TurnSteerParams,
1105 responses::TurnSteerResult
1106 );
1107 codex_forward_typed_method!(turn_interrupt, requests::TurnInterruptParams, EmptyObject);
1108 codex_forward_typed_method!(
1109 review_start,
1110 requests::ReviewStartParams,
1111 responses::ReviewStartResult
1112 );
1113 codex_forward_typed_method!(
1114 model_list,
1115 requests::ModelListParams,
1116 responses::ModelListResult
1117 );
1118 codex_forward_typed_method!(
1119 experimental_feature_list,
1120 requests::ExperimentalFeatureListParams,
1121 responses::ExperimentalFeatureListResult
1122 );
1123 codex_forward_typed_method!(
1124 collaboration_mode_list,
1125 requests::CollaborationModeListParams,
1126 responses::CollaborationModeListResult
1127 );
1128 codex_forward_typed_method!(
1129 mock_experimental_method,
1130 requests::MockExperimentalMethodParams,
1131 responses::MockExperimentalMethodResult
1132 );
1133 codex_forward_typed_method!(
1134 mcp_server_oauth_login,
1135 requests::McpServerOauthLoginParams,
1136 responses::McpServerOauthLoginResult
1137 );
1138 codex_forward_typed_method!(
1139 mcp_server_status_list,
1140 requests::ListMcpServerStatusParams,
1141 responses::McpServerStatusListResult
1142 );
1143 codex_forward_typed_method!(
1144 windows_sandbox_setup_start,
1145 requests::WindowsSandboxSetupStartParams,
1146 responses::WindowsSandboxSetupStartResult
1147 );
1148 codex_forward_typed_method!(
1149 account_login_start,
1150 requests::LoginAccountParams,
1151 responses::LoginAccountResult
1152 );
1153 codex_forward_typed_method!(
1154 account_login_cancel,
1155 requests::CancelLoginAccountParams,
1156 EmptyObject
1157 );
1158 codex_forward_typed_method!(
1159 feedback_upload,
1160 requests::FeedbackUploadParams,
1161 responses::FeedbackUploadResult
1162 );
1163 codex_forward_typed_method!(
1164 command_exec,
1165 requests::CommandExecParams,
1166 responses::CommandExecResult
1167 );
1168 codex_forward_typed_method!(
1169 config_read,
1170 requests::ConfigReadParams,
1171 responses::ConfigReadResult
1172 );
1173 codex_forward_typed_method!(
1174 config_value_write,
1175 requests::ConfigValueWriteParams,
1176 responses::ConfigValueWriteResult
1177 );
1178 codex_forward_typed_method!(
1179 config_batch_write,
1180 requests::ConfigBatchWriteParams,
1181 responses::ConfigBatchWriteResult
1182 );
1183 codex_forward_typed_method!(
1184 account_read,
1185 requests::GetAccountParams,
1186 responses::GetAccountResult
1187 );
1188 codex_forward_typed_method!(
1189 fuzzy_file_search_session_start,
1190 requests::FuzzyFileSearchSessionStartParams,
1191 responses::FuzzyFileSearchSessionStartResult
1192 );
1193 codex_forward_typed_method!(
1194 fuzzy_file_search_session_update,
1195 requests::FuzzyFileSearchSessionUpdateParams,
1196 responses::FuzzyFileSearchSessionUpdateResult
1197 );
1198 codex_forward_typed_method!(
1199 fuzzy_file_search_session_stop,
1200 requests::FuzzyFileSearchSessionStopParams,
1201 responses::FuzzyFileSearchSessionStopResult
1202 );
1203
1204 pub async fn skills_remote_read(
1205 &self,
1206 params: requests::SkillsRemoteReadParams,
1207 ) -> Result<responses::SkillsRemoteReadResult, ClientError> {
1208 self.skills_remote_list(params).await
1209 }
1210
1211 pub async fn skills_remote_write(
1212 &self,
1213 params: requests::SkillsRemoteWriteParams,
1214 ) -> Result<responses::SkillsRemoteWriteResult, ClientError> {
1215 self.skills_remote_export(params).await
1216 }
1217
1218 codex_forward_null_method!(config_mcp_server_reload, EmptyObject);
1219 codex_forward_null_method!(account_logout, EmptyObject);
1220 codex_forward_null_method!(
1221 account_rate_limits_read,
1222 responses::AccountRateLimitsReadResult
1223 );
1224 codex_forward_null_method!(
1225 config_requirements_read,
1226 responses::ConfigRequirementsReadResult
1227 );
1228
1229 pub async fn send_raw_request(
1230 &self,
1231 method: impl Into<String>,
1232 params: Value,
1233 timeout: Option<std::time::Duration>,
1234 ) -> Result<Value, ClientError> {
1235 self.ensure_initialized().await?;
1236 self.inner
1237 .client
1238 .send_raw_request(method, params, timeout)
1239 .await
1240 }
1241
1242 pub async fn send_raw_notification(
1243 &self,
1244 method: impl Into<String>,
1245 params: Value,
1246 ) -> Result<(), ClientError> {
1247 self.ensure_initialized().await?;
1248 self.inner
1249 .client
1250 .send_raw_notification(method, params)
1251 .await
1252 }
1253
1254 pub fn client(&self) -> CodexClient {
1255 self.inner.client.clone()
1256 }
1257
1258 async fn ensure_initialized(&self) -> Result<(), ClientError> {
1259 if self.inner.initialized.load(Ordering::SeqCst) {
1260 return Ok(());
1261 }
1262
1263 let _guard = self.inner.initialize_lock.lock().await;
1264 if self.inner.initialized.load(Ordering::SeqCst) {
1265 return Ok(());
1266 }
1267
1268 match self
1269 .inner
1270 .client
1271 .initialize(self.inner.initialize_params.clone())
1272 .await
1273 {
1274 Ok(_) | Err(ClientError::AlreadyInitialized) => {}
1275 Err(err) => return Err(err),
1276 }
1277
1278 self.inner.client.initialized().await?;
1279 self.inner.initialized.store(true, Ordering::SeqCst);
1280 Ok(())
1281 }
1282}
1283
1284pub struct Thread {
1285 codex: Codex,
1286 id: Option<String>,
1287 needs_resume: bool,
1288 last_turn_id: Option<String>,
1289 options: ThreadOptions,
1290}
1291
1292impl Thread {
1293 pub fn id(&self) -> Option<&str> {
1294 self.id.as_deref()
1295 }
1296
1297 async fn ensure_thread_started_or_resumed(
1298 &mut self,
1299 ) -> Result<(String, Option<String>), ClientError> {
1300 self.codex.ensure_initialized().await?;
1301
1302 let mut emit_thread_started = None;
1303 if self.id.is_none() {
1304 let thread = self
1305 .codex
1306 .inner
1307 .client
1308 .thread_start(build_thread_start_params(&self.options))
1309 .await?;
1310 self.id = Some(thread.thread.id.clone());
1311 emit_thread_started = Some(thread.thread.id);
1312 self.needs_resume = false;
1313 self.last_turn_id = None;
1314 } else if self.needs_resume {
1315 let resume_params =
1316 build_thread_resume_params(&self.id.clone().unwrap_or_default(), &self.options);
1317 let resumed = self.codex.inner.client.thread_resume(resume_params).await?;
1318 self.id = Some(resumed.thread.id);
1319 self.needs_resume = false;
1320 self.last_turn_id = None;
1321 }
1322
1323 let thread_id = self.id.clone().ok_or_else(|| {
1324 ClientError::TransportSend("thread id unavailable after start/resume".to_string())
1325 })?;
1326
1327 Ok((thread_id, emit_thread_started))
1328 }
1329
1330 pub async fn set_name(
1331 &mut self,
1332 name: impl Into<String>,
1333 ) -> Result<responses::ThreadSetNameResult, ClientError> {
1334 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1335 self.codex
1336 .inner
1337 .client
1338 .thread_name_set(requests::ThreadSetNameParams {
1339 thread_id,
1340 name: name.into(),
1341 extra: Map::new(),
1342 })
1343 .await
1344 }
1345
1346 pub async fn read(
1347 &mut self,
1348 include_turns: Option<bool>,
1349 ) -> Result<responses::ThreadReadResult, ClientError> {
1350 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1351 self.codex
1352 .inner
1353 .client
1354 .thread_read(requests::ThreadReadParams {
1355 thread_id,
1356 include_turns,
1357 extra: Map::new(),
1358 })
1359 .await
1360 }
1361
1362 pub async fn archive(&mut self) -> Result<responses::ThreadArchiveResult, ClientError> {
1363 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1364 self.codex
1365 .inner
1366 .client
1367 .thread_archive(requests::ThreadArchiveParams {
1368 thread_id,
1369 extra: Map::new(),
1370 })
1371 .await
1372 }
1373
1374 pub async fn unarchive(&mut self) -> Result<responses::ThreadUnarchiveResult, ClientError> {
1375 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1376 self.codex
1377 .inner
1378 .client
1379 .thread_unarchive(requests::ThreadUnarchiveParams {
1380 thread_id,
1381 extra: Map::new(),
1382 })
1383 .await
1384 }
1385
1386 pub async fn rollback(
1387 &mut self,
1388 count: u32,
1389 ) -> Result<responses::ThreadRollbackResult, ClientError> {
1390 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1391 self.codex
1392 .inner
1393 .client
1394 .thread_rollback(requests::ThreadRollbackParams {
1395 thread_id,
1396 count,
1397 extra: Map::new(),
1398 })
1399 .await
1400 }
1401
1402 pub async fn compact_start(
1403 &mut self,
1404 ) -> Result<responses::ThreadCompactStartResult, ClientError> {
1405 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1406 self.codex
1407 .inner
1408 .client
1409 .thread_compact_start(requests::ThreadCompactStartParams {
1410 thread_id,
1411 extra: Map::new(),
1412 })
1413 .await
1414 }
1415
1416 pub async fn steer(
1417 &mut self,
1418 input: impl Into<Input>,
1419 expected_turn_id: Option<String>,
1420 ) -> Result<responses::TurnSteerResult, ClientError> {
1421 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1422 let expected_turn_id = expected_turn_id.or_else(|| self.last_turn_id.clone());
1423 let expected_turn_id = expected_turn_id.ok_or_else(|| {
1424 ClientError::TransportSend(
1425 "turn/steer requires expected_turn_id or a previously started turn".to_string(),
1426 )
1427 })?;
1428 self.codex
1429 .inner
1430 .client
1431 .turn_steer(requests::TurnSteerParams {
1432 thread_id,
1433 input: normalize_input(input.into()),
1434 expected_turn_id: Some(expected_turn_id),
1435 extra: Map::new(),
1436 })
1437 .await
1438 }
1439
1440 pub async fn interrupt(&mut self, turn_id: impl Into<String>) -> Result<(), ClientError> {
1441 let (thread_id, _) = self.ensure_thread_started_or_resumed().await?;
1442 self.codex
1443 .inner
1444 .client
1445 .turn_interrupt(requests::TurnInterruptParams {
1446 thread_id,
1447 turn_id: turn_id.into(),
1448 extra: Map::new(),
1449 })
1450 .await?;
1451 Ok(())
1452 }
1453
1454 pub async fn run_streamed(
1455 &mut self,
1456 input: impl Into<Input>,
1457 turn_options: TurnOptions,
1458 ) -> Result<StreamedTurn, ClientError> {
1459 let (thread_id, emit_thread_started) = self.ensure_thread_started_or_resumed().await?;
1460
1461 let server_events = self.codex.inner.client.subscribe();
1462
1463 let turn_response = self
1464 .codex
1465 .inner
1466 .client
1467 .turn_start(build_turn_start_params(
1468 &thread_id,
1469 input.into(),
1470 &self.options,
1471 &turn_options,
1472 ))
1473 .await?;
1474 let turn_id = turn_response.turn.id;
1475 self.last_turn_id = Some(turn_id.clone());
1476
1477 let (tx, rx) = mpsc::channel(256);
1478
1479 if let Some(started_thread_id) = emit_thread_started {
1480 if tx
1481 .send(Ok(ThreadEvent::ThreadStarted {
1482 thread_id: started_thread_id,
1483 }))
1484 .await
1485 .is_err()
1486 {
1487 return Err(ClientError::TransportClosed);
1488 }
1489 }
1490
1491 if tx.send(Ok(ThreadEvent::TurnStarted)).await.is_err() {
1492 return Err(ClientError::TransportClosed);
1493 }
1494
1495 let task = tokio::spawn(async move {
1496 pump_turn_events(server_events, tx, thread_id, turn_id).await;
1497 });
1498
1499 Ok(StreamedTurn { receiver: rx, task })
1500 }
1501
1502 pub async fn run(
1503 &mut self,
1504 input: impl Into<Input>,
1505 turn_options: TurnOptions,
1506 ) -> Result<Turn, ThreadRunError> {
1507 let mut streamed = self.run_streamed(input, turn_options).await?;
1508 let mut items = Vec::new();
1509 let mut final_answer = None;
1510 let mut fallback_response = None;
1511 let mut usage = None;
1512 let mut saw_terminal = false;
1513
1514 while let Some(next) = streamed.next_event().await {
1515 let event = next.map_err(ThreadRunError::Client)?;
1516 match event {
1517 ThreadEvent::ItemCompleted { item } => {
1518 update_final_response_candidates(
1519 &item,
1520 &mut final_answer,
1521 &mut fallback_response,
1522 );
1523 items.push(item);
1524 }
1525 ThreadEvent::TurnCompleted { usage: completed } => {
1526 usage = completed;
1527 saw_terminal = true;
1528 break;
1529 }
1530 ThreadEvent::TurnFailed { error } => {
1531 return Err(ThreadRunError::TurnFailed {
1532 message: error.message,
1533 });
1534 }
1535 ThreadEvent::Error { message } => {
1536 return Err(ThreadRunError::TurnFailed { message });
1537 }
1538 ThreadEvent::ThreadStarted { .. }
1539 | ThreadEvent::TurnStarted
1540 | ThreadEvent::ItemStarted { .. }
1541 | ThreadEvent::ItemUpdated { .. } => {}
1542 }
1543 }
1544
1545 if !saw_terminal {
1546 return Err(ThreadRunError::Client(ClientError::TransportClosed));
1547 }
1548
1549 Ok(Turn {
1550 items,
1551 final_response: final_answer.or(fallback_response).unwrap_or_default(),
1552 usage,
1553 })
1554 }
1555
1556 pub async fn ask(
1558 &mut self,
1559 input: impl Into<Input>,
1560 turn_options: TurnOptions,
1561 ) -> Result<String, ThreadRunError> {
1562 let turn = self.run(input, turn_options).await?;
1563 Ok(turn.final_response)
1564 }
1565}
1566
1567async fn pump_turn_events(
1568 mut server_events: tokio::sync::broadcast::Receiver<ServerEvent>,
1569 tx: mpsc::Sender<Result<ThreadEvent, ClientError>>,
1570 thread_id: String,
1571 turn_id: String,
1572) {
1573 let mut latest_usage: Option<Usage> = None;
1574
1575 loop {
1576 let next = server_events.recv().await;
1577 let server_event = match next {
1578 Ok(event) => event,
1579 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
1580 let _ = tx.send(Err(ClientError::TransportClosed)).await;
1581 break;
1582 }
1583 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
1584 continue;
1585 }
1586 };
1587
1588 match server_event {
1589 ServerEvent::Notification(notification) => match notification {
1590 ServerNotification::ItemStarted(payload)
1591 if matches_target_from_extra(&payload.extra, &thread_id, Some(&turn_id)) =>
1592 {
1593 if tx
1594 .send(Ok(ThreadEvent::ItemStarted {
1595 item: parse_thread_item(payload.item),
1596 }))
1597 .await
1598 .is_err()
1599 {
1600 break;
1601 }
1602 }
1603 ServerNotification::ItemCompleted(payload)
1604 if matches_target_from_extra(&payload.extra, &thread_id, Some(&turn_id)) =>
1605 {
1606 if tx
1607 .send(Ok(ThreadEvent::ItemCompleted {
1608 item: parse_thread_item(payload.item),
1609 }))
1610 .await
1611 .is_err()
1612 {
1613 break;
1614 }
1615 }
1616 ServerNotification::ItemAgentMessageDelta(delta)
1617 if matches_target_from_extra(&delta.extra, &thread_id, Some(&turn_id)) =>
1618 {
1619 let text = delta.delta.or(delta.text).unwrap_or_default();
1620 if text.is_empty() {
1621 continue;
1622 }
1623 let item = ThreadItem::AgentMessage(AgentMessageItem {
1624 id: delta.item_id.unwrap_or_default(),
1625 text,
1626 phase: None,
1627 });
1628 if tx
1629 .send(Ok(ThreadEvent::ItemUpdated { item }))
1630 .await
1631 .is_err()
1632 {
1633 break;
1634 }
1635 }
1636 ServerNotification::ItemPlanDelta(delta)
1637 if matches_target_from_extra(&delta.extra, &thread_id, Some(&turn_id)) =>
1638 {
1639 let text = delta.delta.or(delta.text).unwrap_or_default();
1640 if text.is_empty() {
1641 continue;
1642 }
1643 let item = ThreadItem::Plan(PlanItem {
1644 id: delta.item_id.unwrap_or_default(),
1645 text,
1646 });
1647 if tx
1648 .send(Ok(ThreadEvent::ItemUpdated { item }))
1649 .await
1650 .is_err()
1651 {
1652 break;
1653 }
1654 }
1655 ServerNotification::TurnCompleted(payload)
1656 if payload.turn.id == turn_id
1657 && matches_target_from_extra(
1658 &payload.turn.extra,
1659 &thread_id,
1660 Some(&turn_id),
1661 ) =>
1662 {
1663 let status = payload.turn.status.unwrap_or_default().to_ascii_lowercase();
1664 if status == "failed" {
1665 let message = payload
1666 .turn
1667 .error
1668 .map(|error| error.message)
1669 .unwrap_or_else(|| "turn failed".to_string());
1670 let _ = tx
1671 .send(Ok(ThreadEvent::TurnFailed {
1672 error: ThreadError { message },
1673 }))
1674 .await;
1675 break;
1676 }
1677
1678 let usage = parse_usage_from_turn_extra(&payload.turn.extra)
1679 .or_else(|| latest_usage.clone());
1680 let _ = tx.send(Ok(ThreadEvent::TurnCompleted { usage })).await;
1681 break;
1682 }
1683 ServerNotification::ThreadTokenUsageUpdated(payload)
1684 if payload
1685 .thread_id
1686 .as_deref()
1687 .is_none_or(|incoming| incoming == thread_id) =>
1688 {
1689 if !matches_target_from_extra(&payload.extra, &thread_id, Some(&turn_id)) {
1690 continue;
1691 }
1692 latest_usage = payload
1693 .usage
1694 .as_ref()
1695 .and_then(parse_usage_from_value)
1696 .or_else(|| {
1697 payload
1698 .extra
1699 .get("tokenUsage")
1700 .and_then(parse_usage_from_value)
1701 });
1702 }
1703 ServerNotification::Error(payload)
1704 if matches_target_from_extra(&payload.extra, &thread_id, Some(&turn_id)) =>
1705 {
1706 let _ = tx
1707 .send(Ok(ThreadEvent::Error {
1708 message: payload.error.message,
1709 }))
1710 .await;
1711 }
1712 _ => {}
1713 },
1714 ServerEvent::TransportClosed => {
1715 let _ = tx.send(Err(ClientError::TransportClosed)).await;
1716 break;
1717 }
1718 ServerEvent::ServerRequest(_) => {}
1719 }
1720 }
1721}
1722
1723fn update_final_response_candidates(
1724 item: &ThreadItem,
1725 final_answer: &mut Option<String>,
1726 fallback_response: &mut Option<String>,
1727) {
1728 let ThreadItem::AgentMessage(agent_message) = item else {
1729 return;
1730 };
1731
1732 if agent_message.is_final_answer() {
1733 *final_answer = Some(agent_message.text.clone());
1734 } else {
1735 *fallback_response = Some(agent_message.text.clone());
1736 }
1737}
1738
1739fn build_thread_start_params(options: &ThreadOptions) -> requests::ThreadStartParams {
1740 let mut extra = Map::new();
1741 if let Some(skip) = options.skip_git_repo_check {
1742 extra.insert("skipGitRepoCheck".to_string(), Value::Bool(skip));
1743 }
1744 if let Some(mode) = options.web_search_mode {
1745 extra.insert(
1746 "webSearchMode".to_string(),
1747 Value::String(mode.as_str().to_string()),
1748 );
1749 }
1750 if let Some(enabled) = options.web_search_enabled {
1751 extra.insert("webSearchEnabled".to_string(), Value::Bool(enabled));
1752 }
1753 if let Some(network) = options.network_access_enabled {
1754 extra.insert("networkAccessEnabled".to_string(), Value::Bool(network));
1755 }
1756 if let Some(additional) = &options.additional_directories {
1757 extra.insert(
1758 "additionalDirectories".to_string(),
1759 Value::Array(
1760 additional
1761 .iter()
1762 .map(|entry| Value::String(entry.clone()))
1763 .collect(),
1764 ),
1765 );
1766 }
1767 if let Some(config) = &options.config {
1768 extra.insert("config".to_string(), Value::Object(config.clone()));
1769 }
1770 if let Some(dynamic_tools) = &options.dynamic_tools {
1771 extra.insert(
1772 "dynamicTools".to_string(),
1773 Value::Array(
1774 dynamic_tools
1775 .iter()
1776 .map(DynamicToolSpec::as_value)
1777 .collect(),
1778 ),
1779 );
1780 }
1781 if let Some(enabled) = options.experimental_raw_events {
1782 extra.insert("experimentalRawEvents".to_string(), Value::Bool(enabled));
1783 }
1784 if let Some(enabled) = options.persist_extended_history {
1785 extra.insert("persistExtendedHistory".to_string(), Value::Bool(enabled));
1786 }
1787
1788 requests::ThreadStartParams {
1789 model: options.model.clone(),
1790 model_provider: options.model_provider.clone(),
1791 cwd: options.working_directory.clone(),
1792 approval_policy: options
1793 .approval_policy
1794 .map(|mode| mode.as_str().to_string()),
1795 sandbox: options.sandbox_mode.map(|mode| mode.as_str().to_string()),
1796 sandbox_policy: options.sandbox_policy.clone(),
1797 effort: options
1798 .model_reasoning_effort
1799 .map(|effort| effort.as_str().to_string()),
1800 summary: options
1801 .model_reasoning_summary
1802 .map(|summary| summary.as_str().to_string()),
1803 personality: options.personality.map(|value| value.as_str().to_string()),
1804 ephemeral: options.ephemeral,
1805 base_instructions: options.base_instructions.clone(),
1806 developer_instructions: options.developer_instructions.clone(),
1807 extra,
1808 }
1809}
1810
1811fn build_thread_resume_params(
1812 thread_id: &str,
1813 options: &ThreadOptions,
1814) -> requests::ThreadResumeParams {
1815 let mut extra = Map::new();
1816 if let Some(skip) = options.skip_git_repo_check {
1817 extra.insert("skipGitRepoCheck".to_string(), Value::Bool(skip));
1818 }
1819 if let Some(mode) = options.web_search_mode {
1820 extra.insert(
1821 "webSearchMode".to_string(),
1822 Value::String(mode.as_str().to_string()),
1823 );
1824 }
1825 if let Some(enabled) = options.web_search_enabled {
1826 extra.insert("webSearchEnabled".to_string(), Value::Bool(enabled));
1827 }
1828 if let Some(network) = options.network_access_enabled {
1829 extra.insert("networkAccessEnabled".to_string(), Value::Bool(network));
1830 }
1831 if let Some(additional) = &options.additional_directories {
1832 extra.insert(
1833 "additionalDirectories".to_string(),
1834 Value::Array(
1835 additional
1836 .iter()
1837 .map(|entry| Value::String(entry.clone()))
1838 .collect(),
1839 ),
1840 );
1841 }
1842 if let Some(policy) = &options.sandbox_policy {
1843 extra.insert("sandboxPolicy".to_string(), policy.clone());
1844 }
1845 if let Some(effort) = options.model_reasoning_effort {
1846 extra.insert(
1847 "effort".to_string(),
1848 Value::String(effort.as_str().to_string()),
1849 );
1850 }
1851 if let Some(summary) = options.model_reasoning_summary {
1852 extra.insert(
1853 "summary".to_string(),
1854 Value::String(summary.as_str().to_string()),
1855 );
1856 }
1857 if let Some(ephemeral) = options.ephemeral {
1858 extra.insert("ephemeral".to_string(), Value::Bool(ephemeral));
1859 }
1860 if let Some(collaboration_mode) = &options.collaboration_mode {
1861 extra.insert(
1862 "collaborationMode".to_string(),
1863 collaboration_mode.as_value(),
1864 );
1865 }
1866 if let Some(dynamic_tools) = &options.dynamic_tools {
1867 extra.insert(
1868 "dynamicTools".to_string(),
1869 Value::Array(
1870 dynamic_tools
1871 .iter()
1872 .map(DynamicToolSpec::as_value)
1873 .collect(),
1874 ),
1875 );
1876 }
1877 if let Some(enabled) = options.experimental_raw_events {
1878 extra.insert("experimentalRawEvents".to_string(), Value::Bool(enabled));
1879 }
1880
1881 requests::ThreadResumeParams {
1882 thread_id: thread_id.to_string(),
1883 history: None,
1884 path: None,
1885 model: options.model.clone(),
1886 model_provider: options.model_provider.clone(),
1887 cwd: options.working_directory.clone(),
1888 approval_policy: options
1889 .approval_policy
1890 .map(|mode| mode.as_str().to_string()),
1891 sandbox: options.sandbox_mode.map(|mode| mode.as_str().to_string()),
1892 config: options.config.clone(),
1893 base_instructions: options.base_instructions.clone(),
1894 developer_instructions: options.developer_instructions.clone(),
1895 personality: options.personality.map(|value| value.as_str().to_string()),
1896 persist_extended_history: options.persist_extended_history,
1897 extra,
1898 }
1899}
1900
1901fn build_turn_start_params(
1902 thread_id: &str,
1903 input: Input,
1904 options: &ThreadOptions,
1905 turn_options: &TurnOptions,
1906) -> requests::TurnStartParams {
1907 let mut extra = Map::new();
1908 if let Some(skip) = turn_options
1909 .skip_git_repo_check
1910 .or(options.skip_git_repo_check)
1911 {
1912 extra.insert("skipGitRepoCheck".to_string(), Value::Bool(skip));
1913 }
1914 if let Some(mode) = turn_options.web_search_mode.or(options.web_search_mode) {
1915 extra.insert(
1916 "webSearchMode".to_string(),
1917 Value::String(mode.as_str().to_string()),
1918 );
1919 }
1920 if let Some(enabled) = turn_options
1921 .web_search_enabled
1922 .or(options.web_search_enabled)
1923 {
1924 extra.insert("webSearchEnabled".to_string(), Value::Bool(enabled));
1925 }
1926 if let Some(network) = turn_options
1927 .network_access_enabled
1928 .or(options.network_access_enabled)
1929 {
1930 extra.insert("networkAccessEnabled".to_string(), Value::Bool(network));
1931 }
1932 if let Some(additional) = turn_options
1933 .additional_directories
1934 .as_ref()
1935 .or(options.additional_directories.as_ref())
1936 {
1937 extra.insert(
1938 "additionalDirectories".to_string(),
1939 Value::Array(
1940 additional
1941 .iter()
1942 .map(|entry| Value::String(entry.clone()))
1943 .collect(),
1944 ),
1945 );
1946 }
1947 if let Some(collaboration_mode) = turn_options
1948 .collaboration_mode
1949 .as_ref()
1950 .or(options.collaboration_mode.as_ref())
1951 {
1952 extra.insert(
1953 "collaborationMode".to_string(),
1954 collaboration_mode.as_value(),
1955 );
1956 }
1957 if let Some(extra_overrides) = &turn_options.extra {
1958 for (key, value) in extra_overrides {
1959 extra.insert(key.clone(), value.clone());
1960 }
1961 }
1962
1963 requests::TurnStartParams {
1964 thread_id: thread_id.to_string(),
1965 input: normalize_input(input),
1966 cwd: turn_options
1967 .working_directory
1968 .clone()
1969 .or_else(|| options.working_directory.clone()),
1970 model: turn_options.model.clone().or_else(|| options.model.clone()),
1971 model_provider: turn_options
1972 .model_provider
1973 .clone()
1974 .or_else(|| options.model_provider.clone()),
1975 effort: turn_options
1976 .model_reasoning_effort
1977 .or(options.model_reasoning_effort)
1978 .map(|effort| effort.as_str().to_string()),
1979 summary: turn_options
1980 .model_reasoning_summary
1981 .or(options.model_reasoning_summary)
1982 .map(|summary| summary.as_str().to_string()),
1983 personality: turn_options
1984 .personality
1985 .or(options.personality)
1986 .map(|value| value.as_str().to_string()),
1987 output_schema: turn_options.output_schema.clone(),
1988 approval_policy: turn_options
1989 .approval_policy
1990 .or(options.approval_policy)
1991 .map(|mode| mode.as_str().to_string()),
1992 sandbox_policy: turn_options
1993 .sandbox_policy
1994 .clone()
1995 .or_else(|| options.sandbox_policy.clone()),
1996 collaboration_mode: None,
1997 extra,
1998 }
1999}
2000
2001fn normalize_input(input: Input) -> Vec<requests::TurnInputItem> {
2002 match input {
2003 Input::Text(text) => vec![requests::TurnInputItem::Text { text }],
2004 Input::Items(items) => {
2005 let mut text_parts = Vec::new();
2006 let mut normalized = Vec::new();
2007
2008 for item in items {
2009 match item {
2010 UserInput::Text { text } => text_parts.push(text),
2011 UserInput::LocalImage { path } => {
2012 normalized.push(requests::TurnInputItem::LocalImage { path });
2013 }
2014 }
2015 }
2016
2017 if !text_parts.is_empty() {
2018 normalized.insert(
2019 0,
2020 requests::TurnInputItem::Text {
2021 text: text_parts.join("\n\n"),
2022 },
2023 );
2024 }
2025
2026 normalized
2027 }
2028 }
2029}
2030
2031fn matches_target_from_extra(
2032 extra: &Map<String, Value>,
2033 thread_id: &str,
2034 turn_id: Option<&str>,
2035) -> bool {
2036 let thread_matches = extra
2037 .get("threadId")
2038 .and_then(Value::as_str)
2039 .map(|incoming| incoming == thread_id)
2040 .unwrap_or(true);
2041
2042 let turn_matches = match turn_id {
2043 Some(target_turn_id) => extra
2044 .get("turnId")
2045 .and_then(Value::as_str)
2046 .map(|incoming| incoming == target_turn_id)
2047 .unwrap_or(true),
2048 None => true,
2049 };
2050
2051 thread_matches && turn_matches
2052}
2053
2054fn parse_usage_from_turn_extra(extra: &Map<String, Value>) -> Option<Usage> {
2055 extra
2056 .get("usage")
2057 .and_then(parse_usage_from_value)
2058 .or_else(|| extra.get("tokenUsage").and_then(parse_usage_from_value))
2059}
2060
2061fn parse_usage_from_value(value: &Value) -> Option<Usage> {
2062 let object = value.as_object()?;
2063 if let Some(last) = object.get("last") {
2064 return parse_usage_from_value(last);
2065 }
2066
2067 let input_tokens = object.get("inputTokens").and_then(Value::as_i64)?;
2068 let cached_input_tokens = object
2069 .get("cachedInputTokens")
2070 .and_then(Value::as_i64)
2071 .unwrap_or(0);
2072 let output_tokens = object.get("outputTokens").and_then(Value::as_i64)?;
2073
2074 Some(Usage {
2075 input_tokens,
2076 cached_input_tokens,
2077 output_tokens,
2078 })
2079}
2080
2081fn parse_thread_item(item: Value) -> ThreadItem {
2082 let object = match item.as_object() {
2083 Some(object) => object,
2084 None => {
2085 return ThreadItem::Unknown(UnknownItem {
2086 id: None,
2087 item_type: None,
2088 raw: item,
2089 });
2090 }
2091 };
2092
2093 let item_type = object
2094 .get("type")
2095 .and_then(Value::as_str)
2096 .map(|value| value.to_string());
2097 let id = object
2098 .get("id")
2099 .and_then(Value::as_str)
2100 .map(|value| value.to_string());
2101
2102 match item_type.as_deref() {
2103 Some("agentMessage") => ThreadItem::AgentMessage(AgentMessageItem {
2104 id: id.unwrap_or_default(),
2105 text: object
2106 .get("text")
2107 .and_then(Value::as_str)
2108 .unwrap_or_default()
2109 .to_string(),
2110 phase: object
2111 .get("phase")
2112 .and_then(Value::as_str)
2113 .map(parse_agent_message_phase),
2114 }),
2115 Some("userMessage") => ThreadItem::UserMessage(UserMessageItem {
2116 id: id.unwrap_or_default(),
2117 content: object
2118 .get("content")
2119 .and_then(Value::as_array)
2120 .map(|entries| {
2121 entries
2122 .iter()
2123 .map(parse_user_message_content_item)
2124 .collect()
2125 })
2126 .unwrap_or_default(),
2127 }),
2128 Some("plan") => ThreadItem::Plan(PlanItem {
2129 id: id.unwrap_or_default(),
2130 text: object
2131 .get("text")
2132 .and_then(Value::as_str)
2133 .unwrap_or_default()
2134 .to_string(),
2135 }),
2136 Some("reasoning") => ThreadItem::Reasoning(ReasoningItem {
2137 id: id.unwrap_or_default(),
2138 text: parse_reasoning_text(object),
2139 }),
2140 Some("commandExecution") => ThreadItem::CommandExecution(CommandExecutionItem {
2141 id: id.unwrap_or_default(),
2142 command: object
2143 .get("command")
2144 .and_then(Value::as_str)
2145 .unwrap_or_default()
2146 .to_string(),
2147 aggregated_output: object
2148 .get("aggregatedOutput")
2149 .and_then(Value::as_str)
2150 .unwrap_or_default()
2151 .to_string(),
2152 exit_code: object
2153 .get("exitCode")
2154 .and_then(Value::as_i64)
2155 .map(|value| value as i32),
2156 status: parse_command_execution_status(
2157 object
2158 .get("status")
2159 .and_then(Value::as_str)
2160 .unwrap_or_default(),
2161 ),
2162 }),
2163 Some("fileChange") => ThreadItem::FileChange(FileChangeItem {
2164 id: id.unwrap_or_default(),
2165 changes: object
2166 .get("changes")
2167 .and_then(Value::as_array)
2168 .map(|changes| {
2169 changes
2170 .iter()
2171 .filter_map(parse_file_update_change)
2172 .collect::<Vec<_>>()
2173 })
2174 .unwrap_or_default(),
2175 status: parse_patch_apply_status(
2176 object
2177 .get("status")
2178 .and_then(Value::as_str)
2179 .unwrap_or_default(),
2180 ),
2181 }),
2182 Some("mcpToolCall") => {
2183 let error = object
2184 .get("error")
2185 .and_then(Value::as_object)
2186 .and_then(|error| error.get("message"))
2187 .and_then(Value::as_str)
2188 .map(|message| ThreadError {
2189 message: message.to_string(),
2190 });
2191
2192 ThreadItem::McpToolCall(McpToolCallItem {
2193 id: id.unwrap_or_default(),
2194 server: object
2195 .get("server")
2196 .and_then(Value::as_str)
2197 .unwrap_or_default()
2198 .to_string(),
2199 tool: object
2200 .get("tool")
2201 .and_then(Value::as_str)
2202 .unwrap_or_default()
2203 .to_string(),
2204 arguments: object.get("arguments").cloned().unwrap_or(Value::Null),
2205 result: object.get("result").cloned(),
2206 error,
2207 status: parse_mcp_tool_call_status(
2208 object
2209 .get("status")
2210 .and_then(Value::as_str)
2211 .unwrap_or_default(),
2212 ),
2213 })
2214 }
2215 Some("dynamicToolCall") => ThreadItem::DynamicToolCall(DynamicToolCallItem {
2216 id: id.unwrap_or_default(),
2217 tool: object
2218 .get("tool")
2219 .and_then(Value::as_str)
2220 .unwrap_or_default()
2221 .to_string(),
2222 arguments: object.get("arguments").cloned().unwrap_or(Value::Null),
2223 status: object
2224 .get("status")
2225 .and_then(Value::as_str)
2226 .unwrap_or_default()
2227 .to_string(),
2228 content_items: object
2229 .get("contentItems")
2230 .and_then(Value::as_array)
2231 .cloned()
2232 .unwrap_or_default(),
2233 success: object.get("success").and_then(Value::as_bool),
2234 duration_ms: object.get("durationMs").and_then(Value::as_u64),
2235 }),
2236 Some("collabToolCall") => ThreadItem::CollabToolCall(CollabToolCallItem {
2237 id: id.unwrap_or_default(),
2238 tool: object
2239 .get("tool")
2240 .and_then(Value::as_str)
2241 .unwrap_or_default()
2242 .to_string(),
2243 status: object
2244 .get("status")
2245 .and_then(Value::as_str)
2246 .unwrap_or_default()
2247 .to_string(),
2248 sender_thread_id: object
2249 .get("senderThreadId")
2250 .and_then(Value::as_str)
2251 .unwrap_or_default()
2252 .to_string(),
2253 receiver_thread_id: object
2254 .get("receiverThreadId")
2255 .and_then(Value::as_str)
2256 .map(str::to_string),
2257 new_thread_id: object
2258 .get("newThreadId")
2259 .and_then(Value::as_str)
2260 .map(str::to_string),
2261 prompt: object
2262 .get("prompt")
2263 .and_then(Value::as_str)
2264 .map(str::to_string),
2265 agent_status: object
2266 .get("agentStatus")
2267 .and_then(Value::as_str)
2268 .map(str::to_string),
2269 }),
2270 Some("webSearch") => ThreadItem::WebSearch(WebSearchItem {
2271 id: id.unwrap_or_default(),
2272 query: object
2273 .get("query")
2274 .and_then(Value::as_str)
2275 .unwrap_or_default()
2276 .to_string(),
2277 }),
2278 Some("imageView") => ThreadItem::ImageView(ImageViewItem {
2279 id: id.unwrap_or_default(),
2280 path: object
2281 .get("path")
2282 .and_then(Value::as_str)
2283 .unwrap_or_default()
2284 .to_string(),
2285 }),
2286 Some("enteredReviewMode") => ThreadItem::EnteredReviewMode(ReviewModeItem {
2287 id: id.unwrap_or_default(),
2288 review: object
2289 .get("review")
2290 .and_then(Value::as_str)
2291 .unwrap_or_default()
2292 .to_string(),
2293 }),
2294 Some("exitedReviewMode") => ThreadItem::ExitedReviewMode(ReviewModeItem {
2295 id: id.unwrap_or_default(),
2296 review: object
2297 .get("review")
2298 .and_then(Value::as_str)
2299 .unwrap_or_default()
2300 .to_string(),
2301 }),
2302 Some("contextCompaction") => ThreadItem::ContextCompaction(ContextCompactionItem {
2303 id: id.unwrap_or_default(),
2304 }),
2305 Some("todoList") => ThreadItem::TodoList(TodoListItem {
2306 id: id.unwrap_or_default(),
2307 items: object
2308 .get("items")
2309 .and_then(Value::as_array)
2310 .map(|items| items.iter().filter_map(parse_todo_item).collect())
2311 .unwrap_or_default(),
2312 }),
2313 Some("error") => ThreadItem::Error(ErrorItem {
2314 id: id.unwrap_or_default(),
2315 message: object
2316 .get("message")
2317 .and_then(Value::as_str)
2318 .unwrap_or_default()
2319 .to_string(),
2320 }),
2321 _ => ThreadItem::Unknown(UnknownItem {
2322 id,
2323 item_type,
2324 raw: item,
2325 }),
2326 }
2327}
2328
2329fn parse_agent_message_phase(value: &str) -> AgentMessagePhase {
2330 match value {
2331 "commentary" => AgentMessagePhase::Commentary,
2332 "final_answer" => AgentMessagePhase::FinalAnswer,
2333 _ => AgentMessagePhase::Unknown,
2334 }
2335}
2336
2337fn parse_user_message_content_item(value: &Value) -> UserMessageContentItem {
2338 let Some(object) = value.as_object() else {
2339 return UserMessageContentItem::Unknown(value.clone());
2340 };
2341
2342 match object.get("type").and_then(Value::as_str) {
2343 Some("text") => UserMessageContentItem::Text {
2344 text: object
2345 .get("text")
2346 .and_then(Value::as_str)
2347 .unwrap_or_default()
2348 .to_string(),
2349 },
2350 Some("image") => UserMessageContentItem::Image {
2351 url: object
2352 .get("url")
2353 .and_then(Value::as_str)
2354 .unwrap_or_default()
2355 .to_string(),
2356 },
2357 Some("localImage") => UserMessageContentItem::LocalImage {
2358 path: object
2359 .get("path")
2360 .and_then(Value::as_str)
2361 .unwrap_or_default()
2362 .to_string(),
2363 },
2364 _ => UserMessageContentItem::Unknown(value.clone()),
2365 }
2366}
2367
2368fn parse_reasoning_text(object: &Map<String, Value>) -> String {
2369 if let Some(text) = object.get("text").and_then(Value::as_str) {
2370 return text.to_string();
2371 }
2372
2373 let summary = object
2374 .get("summary")
2375 .and_then(Value::as_array)
2376 .map(|entries| {
2377 entries
2378 .iter()
2379 .filter_map(Value::as_str)
2380 .collect::<Vec<_>>()
2381 .join("\n")
2382 })
2383 .unwrap_or_default();
2384
2385 let content = object
2386 .get("content")
2387 .and_then(Value::as_array)
2388 .map(|entries| {
2389 entries
2390 .iter()
2391 .filter_map(Value::as_str)
2392 .collect::<Vec<_>>()
2393 .join("\n")
2394 })
2395 .unwrap_or_default();
2396
2397 if summary.is_empty() {
2398 content
2399 } else if content.is_empty() {
2400 summary
2401 } else {
2402 format!("{summary}\n{content}")
2403 }
2404}
2405
2406fn parse_file_update_change(change: &Value) -> Option<FileUpdateChange> {
2407 let object = change.as_object()?;
2408 let kind = match object.get("kind") {
2409 Some(Value::String(kind)) => parse_patch_change_kind(kind),
2410 Some(Value::Object(kind_object)) => kind_object
2411 .get("type")
2412 .and_then(Value::as_str)
2413 .map(parse_patch_change_kind)
2414 .unwrap_or(PatchChangeKind::Unknown),
2415 _ => PatchChangeKind::Unknown,
2416 };
2417
2418 Some(FileUpdateChange {
2419 path: object
2420 .get("path")
2421 .and_then(Value::as_str)
2422 .unwrap_or_default()
2423 .to_string(),
2424 kind,
2425 })
2426}
2427
2428fn parse_todo_item(value: &Value) -> Option<TodoItem> {
2429 let object = value.as_object()?;
2430 Some(TodoItem {
2431 text: object
2432 .get("text")
2433 .and_then(Value::as_str)
2434 .unwrap_or_default()
2435 .to_string(),
2436 completed: object
2437 .get("completed")
2438 .and_then(Value::as_bool)
2439 .unwrap_or(false),
2440 })
2441}
2442
2443fn parse_command_execution_status(status: &str) -> CommandExecutionStatus {
2444 match status {
2445 "inProgress" | "in_progress" => CommandExecutionStatus::InProgress,
2446 "completed" => CommandExecutionStatus::Completed,
2447 "failed" => CommandExecutionStatus::Failed,
2448 "declined" => CommandExecutionStatus::Declined,
2449 _ => CommandExecutionStatus::Unknown,
2450 }
2451}
2452
2453fn parse_patch_change_kind(kind: &str) -> PatchChangeKind {
2454 match kind {
2455 "add" => PatchChangeKind::Add,
2456 "delete" => PatchChangeKind::Delete,
2457 "update" => PatchChangeKind::Update,
2458 _ => PatchChangeKind::Unknown,
2459 }
2460}
2461
2462fn parse_patch_apply_status(status: &str) -> PatchApplyStatus {
2463 match status {
2464 "inProgress" | "in_progress" => PatchApplyStatus::InProgress,
2465 "completed" => PatchApplyStatus::Completed,
2466 "failed" => PatchApplyStatus::Failed,
2467 "declined" => PatchApplyStatus::Declined,
2468 _ => PatchApplyStatus::Unknown,
2469 }
2470}
2471
2472fn parse_mcp_tool_call_status(status: &str) -> McpToolCallStatus {
2473 match status {
2474 "inProgress" | "in_progress" => McpToolCallStatus::InProgress,
2475 "completed" => McpToolCallStatus::Completed,
2476 "failed" => McpToolCallStatus::Failed,
2477 _ => McpToolCallStatus::Unknown,
2478 }
2479}
2480
2481#[cfg(test)]
2482mod tests {
2483 use super::*;
2484 use schemars::JsonSchema;
2485 use serde::{Deserialize, Serialize};
2486 use serde_json::json;
2487
2488 #[derive(
2489 Debug,
2490 Clone,
2491 PartialEq,
2492 Eq,
2493 Serialize,
2494 Deserialize,
2495 JsonSchema,
2496 codex_app_server_sdk::OpenAiSerializable,
2497 )]
2498 struct StructuredReply {
2499 answer: String,
2500 }
2501
2502 #[test]
2503 fn normalize_input_combines_text_and_images() {
2504 let normalized = normalize_input(Input::Items(vec![
2505 UserInput::Text {
2506 text: "first".to_string(),
2507 },
2508 UserInput::Text {
2509 text: "second".to_string(),
2510 },
2511 UserInput::LocalImage {
2512 path: "/tmp/one.png".to_string(),
2513 },
2514 UserInput::LocalImage {
2515 path: "/tmp/two.png".to_string(),
2516 },
2517 ]));
2518
2519 assert_eq!(normalized.len(), 3);
2520 match &normalized[0] {
2521 requests::TurnInputItem::Text { text } => assert_eq!(text, "first\n\nsecond"),
2522 other => panic!("expected text input item, got {other:?}"),
2523 }
2524 match &normalized[1] {
2525 requests::TurnInputItem::LocalImage { path } => assert_eq!(path, "/tmp/one.png"),
2526 other => panic!("expected local image input item, got {other:?}"),
2527 }
2528 match &normalized[2] {
2529 requests::TurnInputItem::LocalImage { path } => assert_eq!(path, "/tmp/two.png"),
2530 other => panic!("expected local image input item, got {other:?}"),
2531 }
2532 }
2533
2534 #[test]
2535 fn parse_agent_message_item() {
2536 let item = parse_thread_item(json!({
2537 "id": "item_1",
2538 "type": "agentMessage",
2539 "text": "hello",
2540 "phase": "final_answer"
2541 }));
2542
2543 assert_eq!(
2544 item,
2545 ThreadItem::AgentMessage(AgentMessageItem {
2546 id: "item_1".to_string(),
2547 text: "hello".to_string(),
2548 phase: Some(AgentMessagePhase::FinalAnswer),
2549 })
2550 );
2551 }
2552
2553 #[test]
2554 fn parse_missing_documented_thread_item_variants() {
2555 let cases = vec![
2556 (
2557 json!({
2558 "id": "user_1",
2559 "type": "userMessage",
2560 "content": [
2561 { "type": "text", "text": "hello" },
2562 { "type": "localImage", "path": "/tmp/example.png" }
2563 ]
2564 }),
2565 ThreadItem::UserMessage(UserMessageItem {
2566 id: "user_1".to_string(),
2567 content: vec![
2568 UserMessageContentItem::Text {
2569 text: "hello".to_string(),
2570 },
2571 UserMessageContentItem::LocalImage {
2572 path: "/tmp/example.png".to_string(),
2573 },
2574 ],
2575 }),
2576 ),
2577 (
2578 json!({
2579 "id": "plan_1",
2580 "type": "plan",
2581 "text": "1. inspect\n2. patch"
2582 }),
2583 ThreadItem::Plan(PlanItem {
2584 id: "plan_1".to_string(),
2585 text: "1. inspect\n2. patch".to_string(),
2586 }),
2587 ),
2588 (
2589 json!({
2590 "id": "tool_1",
2591 "type": "dynamicToolCall",
2592 "tool": "tool/search",
2593 "arguments": { "q": "rust" },
2594 "status": "completed",
2595 "contentItems": [{ "type": "text", "text": "done" }],
2596 "success": true,
2597 "durationMs": 12
2598 }),
2599 ThreadItem::DynamicToolCall(DynamicToolCallItem {
2600 id: "tool_1".to_string(),
2601 tool: "tool/search".to_string(),
2602 arguments: json!({ "q": "rust" }),
2603 status: "completed".to_string(),
2604 content_items: vec![json!({ "type": "text", "text": "done" })],
2605 success: Some(true),
2606 duration_ms: Some(12),
2607 }),
2608 ),
2609 (
2610 json!({
2611 "id": "collab_1",
2612 "type": "collabToolCall",
2613 "tool": "delegate",
2614 "status": "completed",
2615 "senderThreadId": "thr_a",
2616 "receiverThreadId": "thr_b",
2617 "newThreadId": "thr_c",
2618 "prompt": "review this",
2619 "agentStatus": "idle"
2620 }),
2621 ThreadItem::CollabToolCall(CollabToolCallItem {
2622 id: "collab_1".to_string(),
2623 tool: "delegate".to_string(),
2624 status: "completed".to_string(),
2625 sender_thread_id: "thr_a".to_string(),
2626 receiver_thread_id: Some("thr_b".to_string()),
2627 new_thread_id: Some("thr_c".to_string()),
2628 prompt: Some("review this".to_string()),
2629 agent_status: Some("idle".to_string()),
2630 }),
2631 ),
2632 (
2633 json!({
2634 "id": "image_1",
2635 "type": "imageView",
2636 "path": "/tmp/example.jpg"
2637 }),
2638 ThreadItem::ImageView(ImageViewItem {
2639 id: "image_1".to_string(),
2640 path: "/tmp/example.jpg".to_string(),
2641 }),
2642 ),
2643 (
2644 json!({
2645 "id": "review_1",
2646 "type": "enteredReviewMode",
2647 "review": "current changes"
2648 }),
2649 ThreadItem::EnteredReviewMode(ReviewModeItem {
2650 id: "review_1".to_string(),
2651 review: "current changes".to_string(),
2652 }),
2653 ),
2654 (
2655 json!({
2656 "id": "review_2",
2657 "type": "exitedReviewMode",
2658 "review": "looks good"
2659 }),
2660 ThreadItem::ExitedReviewMode(ReviewModeItem {
2661 id: "review_2".to_string(),
2662 review: "looks good".to_string(),
2663 }),
2664 ),
2665 (
2666 json!({
2667 "id": "compact_1",
2668 "type": "contextCompaction"
2669 }),
2670 ThreadItem::ContextCompaction(ContextCompactionItem {
2671 id: "compact_1".to_string(),
2672 }),
2673 ),
2674 ];
2675
2676 for (raw, expected) in cases {
2677 assert_eq!(parse_thread_item(raw), expected);
2678 }
2679 }
2680
2681 #[test]
2682 fn final_response_prefers_final_answer_phase() {
2683 let mut final_answer = None;
2684 let mut fallback_response = None;
2685
2686 update_final_response_candidates(
2687 &ThreadItem::AgentMessage(AgentMessageItem {
2688 id: "msg_1".to_string(),
2689 text: "thinking".to_string(),
2690 phase: Some(AgentMessagePhase::Commentary),
2691 }),
2692 &mut final_answer,
2693 &mut fallback_response,
2694 );
2695 update_final_response_candidates(
2696 &ThreadItem::AgentMessage(AgentMessageItem {
2697 id: "msg_2".to_string(),
2698 text: "final".to_string(),
2699 phase: Some(AgentMessagePhase::FinalAnswer),
2700 }),
2701 &mut final_answer,
2702 &mut fallback_response,
2703 );
2704
2705 assert_eq!(fallback_response.as_deref(), Some("thinking"));
2706 assert_eq!(final_answer.as_deref(), Some("final"));
2707 assert_eq!(
2708 final_answer.or(fallback_response),
2709 Some("final".to_string())
2710 );
2711 }
2712
2713 #[test]
2714 fn parse_usage_from_token_usage_payload() {
2715 let usage = parse_usage_from_value(&json!({
2716 "last": {
2717 "inputTokens": 10,
2718 "cachedInputTokens": 2,
2719 "outputTokens": 7
2720 }
2721 }));
2722
2723 assert_eq!(
2724 usage,
2725 Some(Usage {
2726 input_tokens: 10,
2727 cached_input_tokens: 2,
2728 output_tokens: 7,
2729 })
2730 );
2731 }
2732
2733 #[test]
2734 fn parse_unknown_item_preserves_payload() {
2735 let raw = json!({
2736 "id": "x",
2737 "type": "newItemType",
2738 "payload": {"a": 1}
2739 });
2740 let parsed = parse_thread_item(raw.clone());
2741
2742 match parsed {
2743 ThreadItem::Unknown(unknown) => {
2744 assert_eq!(unknown.id.as_deref(), Some("x"));
2745 assert_eq!(unknown.item_type.as_deref(), Some("newItemType"));
2746 assert_eq!(unknown.raw, raw);
2747 }
2748 other => panic!("expected unknown item, got {other:?}"),
2749 }
2750 }
2751
2752 #[test]
2753 fn thread_options_builder_maps_extended_protocol_fields() {
2754 let collaboration_mode = CollaborationMode::new(
2755 CollaborationModeKind::Default,
2756 CollaborationModeSettings::new("gpt-5.2-codex")
2757 .with_reasoning_effort(ModelReasoningEffort::High),
2758 );
2759
2760 let options = ThreadOptions::builder()
2761 .model("gpt-5.2-codex")
2762 .model_provider("mock_provider")
2763 .sandbox_mode(SandboxMode::WorkspaceWrite)
2764 .sandbox_policy(json!({"type": "dangerFullAccess"}))
2765 .working_directory("/tmp/workspace")
2766 .skip_git_repo_check(true)
2767 .model_reasoning_effort(ModelReasoningEffort::None)
2768 .model_reasoning_summary(ModelReasoningSummary::Auto)
2769 .network_access_enabled(true)
2770 .web_search_mode(WebSearchMode::Live)
2771 .web_search_enabled(false)
2772 .approval_policy(ApprovalMode::OnRequest)
2773 .add_directory("/tmp/one")
2774 .add_directory("/tmp/two")
2775 .personality(Personality::Pragmatic)
2776 .base_instructions("base instructions")
2777 .developer_instructions("developer instructions")
2778 .ephemeral(true)
2779 .insert_config("sandbox_workspace_write.network_access", Value::Bool(true))
2780 .dynamic_tools(vec![DynamicToolSpec::new(
2781 "demo_tool",
2782 "Demo dynamic tool",
2783 json!({"type": "object"}),
2784 )])
2785 .experimental_raw_events(true)
2786 .persist_extended_history(true)
2787 .collaboration_mode(collaboration_mode)
2788 .build();
2789
2790 let thread_params = build_thread_start_params(&options);
2791 assert_eq!(thread_params.model.as_deref(), Some("gpt-5.2-codex"));
2792 assert_eq!(
2793 thread_params.model_provider.as_deref(),
2794 Some("mock_provider")
2795 );
2796 assert_eq!(thread_params.cwd.as_deref(), Some("/tmp/workspace"));
2797 assert_eq!(thread_params.approval_policy.as_deref(), Some("on-request"));
2798 assert_eq!(thread_params.sandbox.as_deref(), Some("workspace-write"));
2799 assert_eq!(
2800 thread_params.sandbox_policy,
2801 Some(json!({"type": "dangerFullAccess"}))
2802 );
2803 assert_eq!(thread_params.effort.as_deref(), Some("none"));
2804 assert_eq!(thread_params.summary.as_deref(), Some("auto"));
2805 assert_eq!(thread_params.personality.as_deref(), Some("pragmatic"));
2806 assert_eq!(thread_params.ephemeral, Some(true));
2807 assert_eq!(
2808 thread_params.base_instructions.as_deref(),
2809 Some("base instructions")
2810 );
2811 assert_eq!(
2812 thread_params.developer_instructions.as_deref(),
2813 Some("developer instructions")
2814 );
2815 assert_eq!(
2816 thread_params.extra.get("skipGitRepoCheck"),
2817 Some(&Value::Bool(true))
2818 );
2819 assert_eq!(
2820 thread_params.extra.get("webSearchMode"),
2821 Some(&Value::String("live".to_string()))
2822 );
2823 assert_eq!(
2824 thread_params.extra.get("webSearchEnabled"),
2825 Some(&Value::Bool(false))
2826 );
2827 assert_eq!(
2828 thread_params.extra.get("networkAccessEnabled"),
2829 Some(&Value::Bool(true))
2830 );
2831 assert_eq!(
2832 thread_params.extra.get("additionalDirectories"),
2833 Some(&json!(["/tmp/one", "/tmp/two"]))
2834 );
2835 assert_eq!(
2836 thread_params.extra.get("config"),
2837 Some(&json!({"sandbox_workspace_write.network_access": true}))
2838 );
2839 assert_eq!(
2840 thread_params.extra.get("dynamicTools"),
2841 Some(&json!([{
2842 "name": "demo_tool",
2843 "description": "Demo dynamic tool",
2844 "inputSchema": {"type": "object"}
2845 }]))
2846 );
2847 assert_eq!(
2848 thread_params.extra.get("experimentalRawEvents"),
2849 Some(&Value::Bool(true))
2850 );
2851 assert_eq!(
2852 thread_params.extra.get("persistExtendedHistory"),
2853 Some(&Value::Bool(true))
2854 );
2855
2856 let resume_params = build_thread_resume_params("thread_123", &options);
2857 assert_eq!(resume_params.thread_id, "thread_123");
2858 assert_eq!(resume_params.model.as_deref(), Some("gpt-5.2-codex"));
2859 assert_eq!(
2860 resume_params.model_provider.as_deref(),
2861 Some("mock_provider")
2862 );
2863 assert_eq!(resume_params.cwd.as_deref(), Some("/tmp/workspace"));
2864 assert_eq!(resume_params.approval_policy.as_deref(), Some("on-request"));
2865 assert_eq!(resume_params.sandbox.as_deref(), Some("workspace-write"));
2866 assert_eq!(resume_params.personality.as_deref(), Some("pragmatic"));
2867 assert_eq!(
2868 resume_params
2869 .config
2870 .as_ref()
2871 .and_then(|config| config.get("sandbox_workspace_write.network_access")),
2872 Some(&Value::Bool(true))
2873 );
2874 assert_eq!(resume_params.persist_extended_history, Some(true));
2875 assert_eq!(
2876 resume_params.extra.get("sandboxPolicy"),
2877 Some(&json!({"type": "dangerFullAccess"}))
2878 );
2879 assert_eq!(
2880 resume_params.extra.get("effort"),
2881 Some(&Value::String("none".to_string()))
2882 );
2883 assert_eq!(
2884 resume_params.extra.get("summary"),
2885 Some(&Value::String("auto".to_string()))
2886 );
2887 assert_eq!(
2888 resume_params.extra.get("experimentalRawEvents"),
2889 Some(&Value::Bool(true))
2890 );
2891
2892 let turn_params = build_turn_start_params(
2893 "thread_123",
2894 Input::text("hello"),
2895 &options,
2896 &TurnOptions::default(),
2897 );
2898 assert_eq!(turn_params.model_provider.as_deref(), Some("mock_provider"));
2899 assert_eq!(turn_params.effort.as_deref(), Some("none"));
2900 assert_eq!(turn_params.summary.as_deref(), Some("auto"));
2901 assert_eq!(turn_params.personality.as_deref(), Some("pragmatic"));
2902 assert_eq!(
2903 turn_params.sandbox_policy,
2904 Some(json!({"type": "dangerFullAccess"}))
2905 );
2906 assert_eq!(
2907 turn_params.extra.get("collaborationMode"),
2908 Some(&json!({
2909 "mode": "default",
2910 "settings": {
2911 "model": "gpt-5.2-codex",
2912 "reasoning_effort": "high"
2913 }
2914 }))
2915 );
2916 }
2917
2918 #[test]
2919 fn thread_options_builder_skip_git_repo_check_matches_cli_flag_semantics() {
2920 let enabled = ThreadOptions::builder().skip_git_repo_check(true).build();
2921 let enabled_params = build_thread_start_params(&enabled);
2922 assert_eq!(
2923 enabled_params.extra.get("skipGitRepoCheck"),
2924 Some(&Value::Bool(true))
2925 );
2926
2927 let disabled = ThreadOptions::builder().skip_git_repo_check(false).build();
2928 let disabled_params = build_thread_start_params(&disabled);
2929 assert_eq!(
2930 disabled_params.extra.get("skipGitRepoCheck"),
2931 Some(&Value::Bool(false))
2932 );
2933 }
2934
2935 #[test]
2936 fn turn_options_builder_sets_typed_output_schema() {
2937 let turn_options = TurnOptions::builder()
2938 .output_schema_for::<StructuredReply>()
2939 .build();
2940 let turn_params = build_turn_start_params(
2941 "thread_123",
2942 Input::text("hello"),
2943 &ThreadOptions::default(),
2944 &turn_options,
2945 );
2946
2947 assert_eq!(
2948 turn_params.output_schema,
2949 Some(StructuredReply::openai_output_schema())
2950 );
2951 }
2952
2953 #[test]
2954 fn turn_options_builder_clear_output_schema_overrides_previous_value() {
2955 let turn_options = TurnOptions::builder()
2956 .output_schema(json!({"type": "object"}))
2957 .clear_output_schema()
2958 .build();
2959 let turn_params = build_turn_start_params(
2960 "thread_123",
2961 Input::text("hello"),
2962 &ThreadOptions::default(),
2963 &turn_options,
2964 );
2965
2966 assert_eq!(turn_params.output_schema, None);
2967 }
2968
2969 #[test]
2970 fn turn_options_value_helpers_set_raw_and_typed_schemas() {
2971 let raw = TurnOptions::default().with_output_schema(json!({"type": "object"}));
2972 assert_eq!(raw.output_schema, Some(json!({"type": "object"})));
2973
2974 let typed = TurnOptions::default().with_output_schema_for::<StructuredReply>();
2975 assert_eq!(
2976 typed.output_schema,
2977 Some(StructuredReply::openai_output_schema())
2978 );
2979 }
2980
2981 #[test]
2982 fn turn_options_builder_overrides_thread_defaults() {
2983 let thread_options = ThreadOptions::builder()
2984 .model("gpt-5-thread-default")
2985 .model_provider("provider-thread")
2986 .working_directory("/tmp/thread")
2987 .model_reasoning_effort(ModelReasoningEffort::Low)
2988 .model_reasoning_summary(ModelReasoningSummary::Auto)
2989 .personality(Personality::Friendly)
2990 .approval_policy(ApprovalMode::OnRequest)
2991 .sandbox_policy(json!({"thread": true}))
2992 .skip_git_repo_check(false)
2993 .network_access_enabled(false)
2994 .web_search_mode(WebSearchMode::Cached)
2995 .web_search_enabled(false)
2996 .add_directory("/tmp/thread-dir")
2997 .build();
2998
2999 let turn_options = TurnOptions::builder()
3000 .model("gpt-5-turn-override")
3001 .model_provider("provider-turn")
3002 .working_directory("/tmp/turn")
3003 .model_reasoning_effort(ModelReasoningEffort::High)
3004 .model_reasoning_summary(ModelReasoningSummary::Detailed)
3005 .personality(Personality::Pragmatic)
3006 .approval_policy(ApprovalMode::Never)
3007 .sandbox_policy(json!({"turn": true}))
3008 .skip_git_repo_check(true)
3009 .network_access_enabled(true)
3010 .web_search_mode(WebSearchMode::Live)
3011 .web_search_enabled(true)
3012 .add_directory("/tmp/turn-dir")
3013 .insert_extra("customTurnFlag", Value::Bool(true))
3014 .build();
3015
3016 let params = build_turn_start_params(
3017 "thread_123",
3018 Input::text("hello"),
3019 &thread_options,
3020 &turn_options,
3021 );
3022
3023 assert_eq!(params.cwd.as_deref(), Some("/tmp/turn"));
3024 assert_eq!(params.model.as_deref(), Some("gpt-5-turn-override"));
3025 assert_eq!(params.model_provider.as_deref(), Some("provider-turn"));
3026 assert_eq!(params.effort.as_deref(), Some("high"));
3027 assert_eq!(params.summary.as_deref(), Some("detailed"));
3028 assert_eq!(params.personality.as_deref(), Some("pragmatic"));
3029 assert_eq!(params.approval_policy.as_deref(), Some("never"));
3030 assert_eq!(params.sandbox_policy, Some(json!({"turn": true})));
3031 assert_eq!(
3032 params.extra.get("skipGitRepoCheck"),
3033 Some(&Value::Bool(true))
3034 );
3035 assert_eq!(
3036 params.extra.get("networkAccessEnabled"),
3037 Some(&Value::Bool(true))
3038 );
3039 assert_eq!(
3040 params.extra.get("webSearchMode"),
3041 Some(&Value::String("live".to_string()))
3042 );
3043 assert_eq!(
3044 params.extra.get("webSearchEnabled"),
3045 Some(&Value::Bool(true))
3046 );
3047 assert_eq!(
3048 params.extra.get("additionalDirectories"),
3049 Some(&json!(["/tmp/turn-dir"]))
3050 );
3051 assert_eq!(params.extra.get("customTurnFlag"), Some(&Value::Bool(true)));
3052 }
3053}