1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
6use std::time::Duration;
7
8use serde::Serialize;
9use serde_json::{Value, json};
10use tokio::sync::{Mutex, RwLock, broadcast, mpsc, oneshot};
11
12use crate::api::{Codex, ResumeThread, Thread, ThreadOptions};
13use crate::error::{ClientError, IncomingClassified, RpcError, classify_incoming};
14use crate::events::{
15 ServerEvent, ServerNotification, ServerRequestEvent, parse_notification, parse_server_request,
16};
17use crate::protocol::requests;
18use crate::protocol::responses;
19use crate::protocol::server_requests;
20use crate::protocol::shared::{EmptyObject, RequestId};
21use crate::transport::TransportHandle;
22use crate::transport::stdio::spawn_stdio_transport;
23use crate::transport::ws::connect_ws_transport;
24use crate::transport::ws_daemon::ensure_local_ws_app_server;
25
26type PendingMap = HashMap<RequestId, oneshot::Sender<Result<Value, RpcError>>>;
27type RefreshFuture = Pin<
28 Box<
29 dyn Future<Output = Result<server_requests::ChatgptAuthTokensRefreshResponse, ClientError>>
30 + Send,
31 >,
32>;
33type RefreshHandler =
34 Arc<dyn Fn(server_requests::ChatgptAuthTokensRefreshParams) -> RefreshFuture + Send + Sync>;
35type ApplyPatchApprovalFuture = Pin<
36 Box<
37 dyn Future<Output = Result<server_requests::ApplyPatchApprovalResponse, ClientError>>
38 + Send,
39 >,
40>;
41type ApplyPatchApprovalHandler = Arc<
42 dyn Fn(server_requests::ApplyPatchApprovalParams) -> ApplyPatchApprovalFuture + Send + Sync,
43>;
44type ExecCommandApprovalFuture = Pin<
45 Box<
46 dyn Future<Output = Result<server_requests::ExecCommandApprovalResponse, ClientError>>
47 + Send,
48 >,
49>;
50type ExecCommandApprovalHandler = Arc<
51 dyn Fn(server_requests::ExecCommandApprovalParams) -> ExecCommandApprovalFuture + Send + Sync,
52>;
53type CommandExecutionRequestApprovalFuture = Pin<
54 Box<
55 dyn Future<
56 Output = Result<
57 server_requests::CommandExecutionRequestApprovalResponse,
58 ClientError,
59 >,
60 > + Send,
61 >,
62>;
63type CommandExecutionRequestApprovalHandler = Arc<
64 dyn Fn(
65 server_requests::CommandExecutionRequestApprovalParams,
66 ) -> CommandExecutionRequestApprovalFuture
67 + Send
68 + Sync,
69>;
70type FileChangeRequestApprovalFuture = Pin<
71 Box<
72 dyn Future<Output = Result<server_requests::FileChangeRequestApprovalResponse, ClientError>>
73 + Send,
74 >,
75>;
76type FileChangeRequestApprovalHandler = Arc<
77 dyn Fn(server_requests::FileChangeRequestApprovalParams) -> FileChangeRequestApprovalFuture
78 + Send
79 + Sync,
80>;
81type ToolRequestUserInputFuture = Pin<
82 Box<
83 dyn Future<Output = Result<server_requests::ToolRequestUserInputResponse, ClientError>>
84 + Send,
85 >,
86>;
87type ToolRequestUserInputHandler = Arc<
88 dyn Fn(server_requests::ToolRequestUserInputParams) -> ToolRequestUserInputFuture + Send + Sync,
89>;
90type DynamicToolCallFuture = Pin<
91 Box<dyn Future<Output = Result<server_requests::DynamicToolCallResponse, ClientError>> + Send>,
92>;
93type DynamicToolCallHandler =
94 Arc<dyn Fn(server_requests::DynamicToolCallParams) -> DynamicToolCallFuture + Send + Sync>;
95
96#[derive(Debug, Clone)]
97pub struct ClientOptions {
98 pub default_timeout: Duration,
99}
100
101impl Default for ClientOptions {
102 fn default() -> Self {
103 Self {
104 default_timeout: Duration::from_secs(30),
105 }
106 }
107}
108
109#[derive(Debug, Clone)]
110pub struct StdioConfig {
111 pub codex_binary: String,
112 pub args: Vec<String>,
113 pub env: HashMap<String, String>,
114 pub options: ClientOptions,
115}
116
117impl Default for StdioConfig {
118 fn default() -> Self {
119 Self {
120 codex_binary: "codex".to_string(),
121 args: vec!["app-server".to_string()],
122 env: HashMap::new(),
123 options: ClientOptions::default(),
124 }
125 }
126}
127
128#[derive(Debug, Clone)]
129pub struct WsConfig {
130 pub url: String,
131 pub env: HashMap<String, String>,
132 pub options: ClientOptions,
133}
134
135impl WsConfig {
136 pub fn new(
137 url: impl Into<String>,
138 env: HashMap<String, String>,
139 options: ClientOptions,
140 ) -> Self {
141 Self {
142 url: url.into(),
143 env,
144 options,
145 }
146 }
147
148 pub fn with_url(mut self, url: impl Into<String>) -> Self {
149 self.url = url.into();
150 self
151 }
152
153 pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
154 self.env = env;
155 self
156 }
157}
158
159impl Default for WsConfig {
160 fn default() -> Self {
161 Self {
162 url: String::from("ws://127.0.0.1:4222"),
163 env: HashMap::new(),
164 options: ClientOptions::default(),
165 }
166 }
167}
168
169struct Inner {
170 outbound: mpsc::Sender<Value>,
171 pending: Mutex<PendingMap>,
172 default_timeout: Duration,
173 initialized: AtomicBool,
174 ready: AtomicBool,
175 next_id: AtomicI64,
176 event_tx: broadcast::Sender<ServerEvent>,
177 event_rx: Mutex<broadcast::Receiver<ServerEvent>>,
178 refresh_handler: RwLock<Option<RefreshHandler>>,
179 apply_patch_approval_handler: RwLock<Option<ApplyPatchApprovalHandler>>,
180 exec_command_approval_handler: RwLock<Option<ExecCommandApprovalHandler>>,
181 command_execution_request_approval_handler:
182 RwLock<Option<CommandExecutionRequestApprovalHandler>>,
183 file_change_request_approval_handler: RwLock<Option<FileChangeRequestApprovalHandler>>,
184 tool_request_user_input_handler: RwLock<Option<ToolRequestUserInputHandler>>,
185 dynamic_tool_call_handler: RwLock<Option<DynamicToolCallHandler>>,
186}
187
188#[derive(Clone)]
189pub struct CodexClient {
190 inner: Arc<Inner>,
191}
192
193macro_rules! typed_method {
194 ($fn_name:ident, $method:literal, $params_ty:ty, $result_ty:ty) => {
195 pub async fn $fn_name(&self, params: $params_ty) -> Result<$result_ty, ClientError> {
196 self.request_typed_internal($method, params, None, true)
197 .await
198 }
199 };
200}
201
202macro_rules! typed_null_method {
203 ($fn_name:ident, $method:literal, $result_ty:ty) => {
204 pub async fn $fn_name(&self) -> Result<$result_ty, ClientError> {
205 self.request_typed_value_internal($method, Value::Null, None, true)
206 .await
207 }
208 };
209}
210
211impl CodexClient {
212 pub async fn spawn_stdio(config: StdioConfig) -> Result<Self, ClientError> {
213 let handle = spawn_stdio_transport(&config.codex_binary, &config.args, &config.env).await?;
214 Ok(Self::from_transport(handle, config.options.default_timeout))
215 }
216
217 pub async fn connect_ws(config: WsConfig) -> Result<Self, ClientError> {
218 let handle = connect_ws_transport(&config.url).await?;
219 Ok(Self::from_transport(handle, config.options.default_timeout))
220 }
221
222 pub async fn start_and_connect_ws(config: WsConfig) -> Result<Self, ClientError> {
223 ensure_local_ws_app_server(&config.url, &config.env).await?;
224
225 let handle = connect_ws_transport(&config.url).await?;
226 Ok(Self::from_transport(handle, config.options.default_timeout))
227 }
228
229 fn from_transport(handle: TransportHandle, default_timeout: Duration) -> Self {
230 let (event_tx, event_rx) = broadcast::channel(1024);
231 let inner = Arc::new(Inner {
232 outbound: handle.outbound,
233 pending: Mutex::new(HashMap::new()),
234 default_timeout,
235 initialized: AtomicBool::new(false),
236 ready: AtomicBool::new(false),
237 next_id: AtomicI64::new(1),
238 event_tx,
239 event_rx: Mutex::new(event_rx),
240 refresh_handler: RwLock::new(None),
241 apply_patch_approval_handler: RwLock::new(None),
242 exec_command_approval_handler: RwLock::new(None),
243 command_execution_request_approval_handler: RwLock::new(None),
244 file_change_request_approval_handler: RwLock::new(None),
245 tool_request_user_input_handler: RwLock::new(None),
246 dynamic_tool_call_handler: RwLock::new(None),
247 });
248
249 tokio::spawn(run_inbound_loop(handle.inbound, inner.clone()));
250 Self { inner }
251 }
252
253 pub fn as_api(&self) -> Codex {
254 Codex::from_client(self.clone())
255 }
256
257 pub fn start_thread(&self, options: ThreadOptions) -> Thread {
258 self.as_api().start_thread(options)
259 }
260
261 pub fn resume_thread(&self, target: impl Into<ResumeThread>, options: ThreadOptions) -> Thread {
262 self.as_api().resume_thread(target, options)
263 }
264
265 pub fn resume_thread_by_id(&self, id: impl Into<String>, options: ThreadOptions) -> Thread {
266 self.as_api().resume_thread_by_id(id, options)
267 }
268
269 pub fn resume_latest_thread(&self, options: ThreadOptions) -> Thread {
270 self.as_api().resume_latest_thread(options)
271 }
272
273 pub fn subscribe(&self) -> broadcast::Receiver<ServerEvent> {
274 self.inner.event_tx.subscribe()
275 }
276
277 pub async fn next_event(&self) -> Result<ServerEvent, ClientError> {
278 let mut rx = self.inner.event_rx.lock().await;
279 rx.recv().await.map_err(|err| {
280 ClientError::TransportSend(format!("event channel receive failed: {err}"))
281 })
282 }
283
284 pub async fn set_chatgpt_auth_tokens_refresh_handler<F, Fut>(&self, handler: F)
285 where
286 F: Fn(server_requests::ChatgptAuthTokensRefreshParams) -> Fut + Send + Sync + 'static,
287 Fut: Future<Output = Result<server_requests::ChatgptAuthTokensRefreshResponse, ClientError>>
288 + Send
289 + 'static,
290 {
291 let wrapped: RefreshHandler = Arc::new(move |params| Box::pin(handler(params)));
292 *self.inner.refresh_handler.write().await = Some(wrapped);
293 }
294
295 pub async fn clear_chatgpt_auth_tokens_refresh_handler(&self) {
296 *self.inner.refresh_handler.write().await = None;
297 }
298
299 pub async fn set_apply_patch_approval_handler<F, Fut>(&self, handler: F)
300 where
301 F: Fn(server_requests::ApplyPatchApprovalParams) -> Fut + Send + Sync + 'static,
302 Fut: Future<Output = Result<server_requests::ApplyPatchApprovalResponse, ClientError>>
303 + Send
304 + 'static,
305 {
306 let wrapped: ApplyPatchApprovalHandler = Arc::new(move |params| Box::pin(handler(params)));
307 *self.inner.apply_patch_approval_handler.write().await = Some(wrapped);
308 }
309
310 pub async fn clear_apply_patch_approval_handler(&self) {
311 *self.inner.apply_patch_approval_handler.write().await = None;
312 }
313
314 pub async fn set_exec_command_approval_handler<F, Fut>(&self, handler: F)
315 where
316 F: Fn(server_requests::ExecCommandApprovalParams) -> Fut + Send + Sync + 'static,
317 Fut: Future<Output = Result<server_requests::ExecCommandApprovalResponse, ClientError>>
318 + Send
319 + 'static,
320 {
321 let wrapped: ExecCommandApprovalHandler = Arc::new(move |params| Box::pin(handler(params)));
322 *self.inner.exec_command_approval_handler.write().await = Some(wrapped);
323 }
324
325 pub async fn clear_exec_command_approval_handler(&self) {
326 *self.inner.exec_command_approval_handler.write().await = None;
327 }
328
329 pub async fn set_command_execution_request_approval_handler<F, Fut>(&self, handler: F)
330 where
331 F: Fn(server_requests::CommandExecutionRequestApprovalParams) -> Fut
332 + Send
333 + Sync
334 + 'static,
335 Fut: Future<
336 Output = Result<
337 server_requests::CommandExecutionRequestApprovalResponse,
338 ClientError,
339 >,
340 > + Send
341 + 'static,
342 {
343 let wrapped: CommandExecutionRequestApprovalHandler =
344 Arc::new(move |params| Box::pin(handler(params)));
345 *self
346 .inner
347 .command_execution_request_approval_handler
348 .write()
349 .await = Some(wrapped);
350 }
351
352 pub async fn clear_command_execution_request_approval_handler(&self) {
353 *self
354 .inner
355 .command_execution_request_approval_handler
356 .write()
357 .await = None;
358 }
359
360 pub async fn set_file_change_request_approval_handler<F, Fut>(&self, handler: F)
361 where
362 F: Fn(server_requests::FileChangeRequestApprovalParams) -> Fut + Send + Sync + 'static,
363 Fut: Future<Output = Result<server_requests::FileChangeRequestApprovalResponse, ClientError>>
364 + Send
365 + 'static,
366 {
367 let wrapped: FileChangeRequestApprovalHandler =
368 Arc::new(move |params| Box::pin(handler(params)));
369 *self
370 .inner
371 .file_change_request_approval_handler
372 .write()
373 .await = Some(wrapped);
374 }
375
376 pub async fn clear_file_change_request_approval_handler(&self) {
377 *self
378 .inner
379 .file_change_request_approval_handler
380 .write()
381 .await = None;
382 }
383
384 pub async fn set_tool_request_user_input_handler<F, Fut>(&self, handler: F)
385 where
386 F: Fn(server_requests::ToolRequestUserInputParams) -> Fut + Send + Sync + 'static,
387 Fut: Future<Output = Result<server_requests::ToolRequestUserInputResponse, ClientError>>
388 + Send
389 + 'static,
390 {
391 let wrapped: ToolRequestUserInputHandler =
392 Arc::new(move |params| Box::pin(handler(params)));
393 *self.inner.tool_request_user_input_handler.write().await = Some(wrapped);
394 }
395
396 pub async fn clear_tool_request_user_input_handler(&self) {
397 *self.inner.tool_request_user_input_handler.write().await = None;
398 }
399
400 pub async fn set_dynamic_tool_call_handler<F, Fut>(&self, handler: F)
401 where
402 F: Fn(server_requests::DynamicToolCallParams) -> Fut + Send + Sync + 'static,
403 Fut: Future<Output = Result<server_requests::DynamicToolCallResponse, ClientError>>
404 + Send
405 + 'static,
406 {
407 let wrapped: DynamicToolCallHandler = Arc::new(move |params| Box::pin(handler(params)));
408 *self.inner.dynamic_tool_call_handler.write().await = Some(wrapped);
409 }
410
411 pub async fn clear_dynamic_tool_call_handler(&self) {
412 *self.inner.dynamic_tool_call_handler.write().await = None;
413 }
414
415 pub async fn initialize(
416 &self,
417 params: requests::InitializeParams,
418 ) -> Result<responses::InitializeResult, ClientError> {
419 if self.inner.initialized.load(Ordering::SeqCst) {
420 return Err(ClientError::AlreadyInitialized);
421 }
422
423 let result: responses::InitializeResult = self
424 .request_typed_internal("initialize", params, None, false)
425 .await?;
426
427 self.inner.initialized.store(true, Ordering::SeqCst);
428 Ok(result)
429 }
430
431 pub async fn initialized(&self) -> Result<(), ClientError> {
432 if !self.inner.initialized.load(Ordering::SeqCst) {
433 return Err(ClientError::NotInitialized {
434 method: "initialized".to_string(),
435 });
436 }
437 self.send_notification("initialized", EmptyObject::default(), false)
438 .await?;
439 self.inner.ready.store(true, Ordering::SeqCst);
440 Ok(())
441 }
442
443 pub async fn send_raw_request(
444 &self,
445 method: impl Into<String>,
446 params: Value,
447 timeout: Option<Duration>,
448 ) -> Result<Value, ClientError> {
449 let method = method.into();
450 let requires_ready = method != "initialize";
451 self.request_value_internal(&method, params, timeout, requires_ready)
452 .await
453 }
454
455 pub async fn send_raw_notification(
456 &self,
457 method: impl Into<String>,
458 params: Value,
459 ) -> Result<(), ClientError> {
460 let method = method.into();
461 let requires_ready = method != "initialized";
462 self.send_notification(&method, params, requires_ready)
463 .await
464 }
465
466 pub async fn respond_server_request<R: Serialize>(
467 &self,
468 id: RequestId,
469 result: R,
470 ) -> Result<(), ClientError> {
471 let result = serde_json::to_value(result)?;
472 self.send_message(json!({ "id": id, "result": result }))
473 .await
474 }
475
476 pub async fn respond_server_request_error(
477 &self,
478 id: RequestId,
479 error: RpcError,
480 ) -> Result<(), ClientError> {
481 self.send_message(json!({ "id": id, "error": error })).await
482 }
483
484 pub async fn respond_chatgpt_auth_tokens_refresh(
485 &self,
486 id: RequestId,
487 response: server_requests::ChatgptAuthTokensRefreshResponse,
488 ) -> Result<(), ClientError> {
489 self.respond_server_request(id, response).await
490 }
491
492 pub async fn respond_apply_patch_approval(
493 &self,
494 id: RequestId,
495 response: server_requests::ApplyPatchApprovalResponse,
496 ) -> Result<(), ClientError> {
497 self.respond_server_request(id, response).await
498 }
499
500 pub async fn respond_exec_command_approval(
501 &self,
502 id: RequestId,
503 response: server_requests::ExecCommandApprovalResponse,
504 ) -> Result<(), ClientError> {
505 self.respond_server_request(id, response).await
506 }
507
508 pub async fn respond_command_execution_request_approval(
509 &self,
510 id: RequestId,
511 response: server_requests::CommandExecutionRequestApprovalResponse,
512 ) -> Result<(), ClientError> {
513 self.respond_server_request(id, response).await
514 }
515
516 pub async fn respond_file_change_request_approval(
517 &self,
518 id: RequestId,
519 response: server_requests::FileChangeRequestApprovalResponse,
520 ) -> Result<(), ClientError> {
521 self.respond_server_request(id, response).await
522 }
523
524 pub async fn respond_tool_request_user_input(
525 &self,
526 id: RequestId,
527 response: server_requests::ToolRequestUserInputResponse,
528 ) -> Result<(), ClientError> {
529 self.respond_server_request(id, response).await
530 }
531
532 pub async fn respond_dynamic_tool_call(
533 &self,
534 id: RequestId,
535 response: server_requests::DynamicToolCallResponse,
536 ) -> Result<(), ClientError> {
537 self.respond_server_request(id, response).await
538 }
539
540 typed_method!(
541 thread_start,
542 "thread/start",
543 requests::ThreadStartParams,
544 responses::ThreadResult
545 );
546 typed_method!(
547 thread_resume,
548 "thread/resume",
549 requests::ThreadResumeParams,
550 responses::ThreadResult
551 );
552 typed_method!(
553 thread_fork,
554 "thread/fork",
555 requests::ThreadForkParams,
556 responses::ThreadResult
557 );
558 typed_method!(
559 thread_archive,
560 "thread/archive",
561 requests::ThreadArchiveParams,
562 responses::ThreadArchiveResult
563 );
564 typed_method!(
565 thread_name_set,
566 "thread/name/set",
567 requests::ThreadSetNameParams,
568 responses::ThreadSetNameResult
569 );
570 typed_method!(
571 thread_unarchive,
572 "thread/unarchive",
573 requests::ThreadUnarchiveParams,
574 responses::ThreadUnarchiveResult
575 );
576 typed_method!(
577 thread_compact_start,
578 "thread/compact/start",
579 requests::ThreadCompactStartParams,
580 responses::ThreadCompactStartResult
581 );
582 typed_method!(
583 thread_background_terminals_clean,
584 "thread/backgroundTerminals/clean",
585 requests::ThreadBackgroundTerminalsCleanParams,
586 responses::ThreadBackgroundTerminalsCleanResult
587 );
588 typed_method!(
589 thread_rollback,
590 "thread/rollback",
591 requests::ThreadRollbackParams,
592 responses::ThreadRollbackResult
593 );
594 typed_method!(
595 thread_list,
596 "thread/list",
597 requests::ThreadListParams,
598 responses::ThreadListResult
599 );
600 typed_method!(
601 thread_loaded_list,
602 "thread/loaded/list",
603 requests::ThreadLoadedListParams,
604 responses::ThreadLoadedListResult
605 );
606 typed_method!(
607 thread_read,
608 "thread/read",
609 requests::ThreadReadParams,
610 responses::ThreadReadResult
611 );
612 typed_method!(
613 skills_list,
614 "skills/list",
615 requests::SkillsListParams,
616 responses::SkillsListResult
617 );
618 typed_method!(
619 skills_remote_list,
620 "skills/remote/list",
621 requests::SkillsRemoteReadParams,
622 responses::SkillsRemoteReadResult
623 );
624 typed_method!(
625 skills_remote_export,
626 "skills/remote/export",
627 requests::SkillsRemoteWriteParams,
628 responses::SkillsRemoteWriteResult
629 );
630 typed_method!(
631 app_list,
632 "app/list",
633 requests::AppsListParams,
634 responses::AppsListResult
635 );
636 typed_method!(
637 skills_config_write,
638 "skills/config/write",
639 requests::SkillsConfigWriteParams,
640 responses::SkillsConfigWriteResult
641 );
642 typed_method!(
643 turn_start,
644 "turn/start",
645 requests::TurnStartParams,
646 responses::TurnResult
647 );
648 typed_method!(
649 turn_steer,
650 "turn/steer",
651 requests::TurnSteerParams,
652 responses::TurnSteerResult
653 );
654 typed_method!(
655 turn_interrupt,
656 "turn/interrupt",
657 requests::TurnInterruptParams,
658 EmptyObject
659 );
660 typed_method!(
661 review_start,
662 "review/start",
663 requests::ReviewStartParams,
664 responses::ReviewStartResult
665 );
666 typed_method!(
667 model_list,
668 "model/list",
669 requests::ModelListParams,
670 responses::ModelListResult
671 );
672 typed_method!(
673 experimental_feature_list,
674 "experimentalFeature/list",
675 requests::ExperimentalFeatureListParams,
676 responses::ExperimentalFeatureListResult
677 );
678 typed_method!(
679 collaboration_mode_list,
680 "collaborationMode/list",
681 requests::CollaborationModeListParams,
682 responses::CollaborationModeListResult
683 );
684 typed_method!(
685 mock_experimental_method,
686 "mock/experimentalMethod",
687 requests::MockExperimentalMethodParams,
688 responses::MockExperimentalMethodResult
689 );
690 typed_method!(
691 mcp_server_oauth_login,
692 "mcpServer/oauth/login",
693 requests::McpServerOauthLoginParams,
694 responses::McpServerOauthLoginResult
695 );
696 typed_method!(
697 mcp_server_status_list,
698 "mcpServerStatus/list",
699 requests::ListMcpServerStatusParams,
700 responses::McpServerStatusListResult
701 );
702 typed_method!(
703 windows_sandbox_setup_start,
704 "windowsSandbox/setupStart",
705 requests::WindowsSandboxSetupStartParams,
706 responses::WindowsSandboxSetupStartResult
707 );
708 typed_method!(
709 account_login_start,
710 "account/login/start",
711 requests::LoginAccountParams,
712 responses::LoginAccountResult
713 );
714 typed_method!(
715 account_login_cancel,
716 "account/login/cancel",
717 requests::CancelLoginAccountParams,
718 EmptyObject
719 );
720 typed_method!(
721 feedback_upload,
722 "feedback/upload",
723 requests::FeedbackUploadParams,
724 responses::FeedbackUploadResult
725 );
726 typed_method!(
727 command_exec,
728 "command/exec",
729 requests::CommandExecParams,
730 responses::CommandExecResult
731 );
732 typed_method!(
733 config_read,
734 "config/read",
735 requests::ConfigReadParams,
736 responses::ConfigReadResult
737 );
738 typed_method!(
739 config_value_write,
740 "config/value/write",
741 requests::ConfigValueWriteParams,
742 responses::ConfigValueWriteResult
743 );
744 typed_method!(
745 config_batch_write,
746 "config/batchWrite",
747 requests::ConfigBatchWriteParams,
748 responses::ConfigBatchWriteResult
749 );
750 typed_method!(
751 account_read,
752 "account/read",
753 requests::GetAccountParams,
754 responses::GetAccountResult
755 );
756 typed_method!(
757 fuzzy_file_search_session_start,
758 "fuzzyFileSearch/sessionStart",
759 requests::FuzzyFileSearchSessionStartParams,
760 responses::FuzzyFileSearchSessionStartResult
761 );
762 typed_method!(
763 fuzzy_file_search_session_update,
764 "fuzzyFileSearch/sessionUpdate",
765 requests::FuzzyFileSearchSessionUpdateParams,
766 responses::FuzzyFileSearchSessionUpdateResult
767 );
768 typed_method!(
769 fuzzy_file_search_session_stop,
770 "fuzzyFileSearch/sessionStop",
771 requests::FuzzyFileSearchSessionStopParams,
772 responses::FuzzyFileSearchSessionStopResult
773 );
774
775 pub async fn skills_remote_read(
777 &self,
778 params: requests::SkillsRemoteReadParams,
779 ) -> Result<responses::SkillsRemoteReadResult, ClientError> {
780 self.skills_remote_list(params).await
781 }
782
783 pub async fn skills_remote_write(
784 &self,
785 params: requests::SkillsRemoteWriteParams,
786 ) -> Result<responses::SkillsRemoteWriteResult, ClientError> {
787 self.skills_remote_export(params).await
788 }
789
790 typed_null_method!(
791 config_mcp_server_reload,
792 "config/mcpServer/reload",
793 EmptyObject
794 );
795 typed_null_method!(account_logout, "account/logout", EmptyObject);
796 typed_null_method!(
797 account_rate_limits_read,
798 "account/rateLimits/read",
799 responses::AccountRateLimitsReadResult
800 );
801 typed_null_method!(
802 config_requirements_read,
803 "configRequirements/read",
804 responses::ConfigRequirementsReadResult
805 );
806
807 async fn send_notification<P: Serialize>(
808 &self,
809 method: &str,
810 params: P,
811 requires_ready: bool,
812 ) -> Result<(), ClientError> {
813 if requires_ready && !self.inner.ready.load(Ordering::SeqCst) {
814 return Err(ClientError::NotReady {
815 method: method.to_string(),
816 });
817 }
818
819 let value = serde_json::to_value(params)?;
820 self.send_message(json!({ "method": method, "params": value }))
821 .await
822 }
823
824 async fn request_typed_internal<P, R>(
825 &self,
826 method: &str,
827 params: P,
828 timeout: Option<Duration>,
829 requires_ready: bool,
830 ) -> Result<R, ClientError>
831 where
832 P: Serialize,
833 R: serde::de::DeserializeOwned,
834 {
835 let value = serde_json::to_value(params)?;
836 self.request_typed_value_internal(method, value, timeout, requires_ready)
837 .await
838 }
839
840 async fn request_typed_value_internal<R>(
841 &self,
842 method: &str,
843 params: Value,
844 timeout: Option<Duration>,
845 requires_ready: bool,
846 ) -> Result<R, ClientError>
847 where
848 R: serde::de::DeserializeOwned,
849 {
850 let raw = self
851 .request_value_internal(method, params, timeout, requires_ready)
852 .await?;
853
854 serde_json::from_value(raw).map_err(|source| ClientError::UnexpectedResult {
855 method: method.to_string(),
856 source,
857 })
858 }
859
860 async fn request_value_internal(
861 &self,
862 method: &str,
863 params: Value,
864 timeout: Option<Duration>,
865 requires_ready: bool,
866 ) -> Result<Value, ClientError> {
867 if requires_ready && !self.inner.ready.load(Ordering::SeqCst) {
868 return Err(ClientError::NotReady {
869 method: method.to_string(),
870 });
871 }
872
873 if method == "initialize" && self.inner.initialized.load(Ordering::SeqCst) {
874 return Err(ClientError::AlreadyInitialized);
875 }
876
877 let id_num = self.inner.next_id.fetch_add(1, Ordering::SeqCst);
878 let id = RequestId::Integer(id_num);
879
880 let request = json!({
881 "method": method,
882 "id": id,
883 "params": params,
884 });
885
886 let (tx, rx) = oneshot::channel();
887 self.inner.pending.lock().await.insert(id.clone(), tx);
888
889 if let Err(err) = self.send_message(request).await {
890 self.inner.pending.lock().await.remove(&id);
891 return Err(err);
892 }
893
894 let timeout = timeout.unwrap_or(self.inner.default_timeout);
895 match tokio::time::timeout(timeout, rx).await {
896 Ok(Ok(Ok(value))) => Ok(value),
897 Ok(Ok(Err(error))) => Err(ClientError::Rpc { error }),
898 Ok(Err(_)) => Err(ClientError::TransportClosed),
899 Err(_) => {
900 self.inner.pending.lock().await.remove(&id);
901 Err(ClientError::Timeout {
902 method: method.to_string(),
903 timeout_ms: timeout.as_millis() as u64,
904 })
905 }
906 }
907 }
908
909 async fn send_message(&self, value: Value) -> Result<(), ClientError> {
910 self.inner.outbound.send(value).await.map_err(|err| {
911 ClientError::TransportSend(format!("failed to send outbound frame: {err}"))
912 })
913 }
914}
915
916async fn run_inbound_loop(
917 mut inbound: mpsc::Receiver<Result<Value, ClientError>>,
918 inner: Arc<Inner>,
919) {
920 while let Some(frame) = inbound.recv().await {
921 match frame {
922 Ok(value) => {
923 if let Err(err) = process_incoming_value(value, &inner).await {
924 fail_all_pending(&inner, &format!("processing inbound frame failed: {err}"))
925 .await;
926 let _ = inner.event_tx.send(ServerEvent::TransportClosed);
927 break;
928 }
929 }
930 Err(err) => {
931 fail_all_pending(&inner, &format!("transport error: {err}")).await;
932 let _ = inner.event_tx.send(ServerEvent::TransportClosed);
933 break;
934 }
935 }
936 }
937}
938
939async fn process_incoming_value(value: Value, inner: &Arc<Inner>) -> Result<(), ClientError> {
940 match classify_incoming(value)? {
941 IncomingClassified::Response { id, result } => {
942 if let Some(sender) = inner.pending.lock().await.remove(&id) {
943 let _ = sender.send(result);
944 }
945 }
946 IncomingClassified::Notification {
947 method,
948 params,
949 raw: _,
950 } => {
951 let parsed = parse_notification(method.clone(), params.clone())
952 .unwrap_or(ServerNotification::Unknown { method, params });
953 let _ = inner.event_tx.send(ServerEvent::Notification(parsed));
954 }
955 IncomingClassified::ServerRequest {
956 id,
957 method,
958 params,
959 raw: _,
960 } => {
961 let parsed = parse_server_request(id.clone(), method.clone(), params.clone())
962 .unwrap_or(ServerRequestEvent::Unknown { id, method, params });
963 if !try_auto_handle_server_request(inner, &parsed).await {
964 let _ = inner.event_tx.send(ServerEvent::ServerRequest(parsed));
965 }
966 }
967 }
968 Ok(())
969}
970
971async fn try_auto_handle_server_request(inner: &Arc<Inner>, request: &ServerRequestEvent) -> bool {
972 match request {
973 ServerRequestEvent::ChatgptAuthTokensRefresh { id, params } => {
974 let handler = inner.refresh_handler.read().await.clone();
975 let Some(handler) = handler else {
976 return false;
977 };
978
979 let response = handler(params.clone()).await;
980 send_server_request_handler_result(inner, id, response, "chatgptAuthTokens refresh")
981 .await
982 }
983 ServerRequestEvent::ApplyPatchApproval { id, params } => {
984 let handler = inner.apply_patch_approval_handler.read().await.clone();
985 let Some(handler) = handler else {
986 return false;
987 };
988
989 let response = handler(params.clone()).await;
990 send_server_request_handler_result(inner, id, response, "applyPatchApproval").await
991 }
992 ServerRequestEvent::ExecCommandApproval { id, params } => {
993 let handler = inner.exec_command_approval_handler.read().await.clone();
994 let Some(handler) = handler else {
995 return false;
996 };
997
998 let response = handler(params.clone()).await;
999 send_server_request_handler_result(inner, id, response, "execCommandApproval").await
1000 }
1001 ServerRequestEvent::CommandExecutionRequestApproval { id, params } => {
1002 let handler = inner
1003 .command_execution_request_approval_handler
1004 .read()
1005 .await
1006 .clone();
1007 let Some(handler) = handler else {
1008 return false;
1009 };
1010
1011 let response = handler(params.clone()).await;
1012 send_server_request_handler_result(
1013 inner,
1014 id,
1015 response,
1016 "item/commandExecution/requestApproval",
1017 )
1018 .await
1019 }
1020 ServerRequestEvent::FileChangeRequestApproval { id, params } => {
1021 let handler = inner
1022 .file_change_request_approval_handler
1023 .read()
1024 .await
1025 .clone();
1026 let Some(handler) = handler else {
1027 return false;
1028 };
1029
1030 let response = handler(params.clone()).await;
1031 send_server_request_handler_result(
1032 inner,
1033 id,
1034 response,
1035 "item/fileChange/requestApproval",
1036 )
1037 .await
1038 }
1039 ServerRequestEvent::ToolRequestUserInput { id, params } => {
1040 let handler = inner.tool_request_user_input_handler.read().await.clone();
1041 let Some(handler) = handler else {
1042 return false;
1043 };
1044
1045 let response = handler(params.clone()).await;
1046 send_server_request_handler_result(inner, id, response, "item/tool/requestUserInput")
1047 .await
1048 }
1049 ServerRequestEvent::DynamicToolCall { id, params } => {
1050 let handler = inner.dynamic_tool_call_handler.read().await.clone();
1051 let Some(handler) = handler else {
1052 return false;
1053 };
1054
1055 let response = handler(params.clone()).await;
1056 send_server_request_handler_result(inner, id, response, "item/tool/call").await
1057 }
1058 _ => false,
1059 }
1060}
1061
1062async fn send_server_request_handler_result<R: Serialize>(
1063 inner: &Arc<Inner>,
1064 id: &RequestId,
1065 response: Result<R, ClientError>,
1066 context: &str,
1067) -> bool {
1068 let payload = match response {
1069 Ok(result) => json!({ "id": id, "result": result }),
1070 Err(err) => json!({
1071 "id": id,
1072 "error": {
1073 "code": -32001,
1074 "message": format!("{context} handler failed: {err}")
1075 }
1076 }),
1077 };
1078
1079 if inner.outbound.send(payload).await.is_err() {
1080 let _ = inner.event_tx.send(ServerEvent::TransportClosed);
1081 }
1082
1083 true
1084}
1085
1086async fn fail_all_pending(inner: &Arc<Inner>, message: &str) {
1087 let mut pending = inner.pending.lock().await;
1088 let entries = std::mem::take(&mut *pending);
1089 drop(pending);
1090
1091 for (_, sender) in entries {
1092 let _ = sender.send(Err(RpcError {
1093 code: -32098,
1094 message: message.to_string(),
1095 data: None,
1096 }));
1097 }
1098}
1099
1100#[cfg(test)]
1101mod tests {
1102 use super::*;
1103 use tokio::time::{Duration, timeout};
1104
1105 fn test_client() -> (
1106 CodexClient,
1107 mpsc::Sender<Result<Value, ClientError>>,
1108 mpsc::Receiver<Value>,
1109 ) {
1110 let (transport_outbound_tx, transport_outbound_rx) = mpsc::channel::<Value>(32);
1111 let (transport_inbound_tx, transport_inbound_rx) =
1112 mpsc::channel::<Result<Value, ClientError>>(32);
1113 let client = CodexClient::from_transport(
1114 TransportHandle {
1115 outbound: transport_outbound_tx,
1116 inbound: transport_inbound_rx,
1117 },
1118 Duration::from_secs(5),
1119 );
1120 (client, transport_inbound_tx, transport_outbound_rx)
1121 }
1122
1123 #[tokio::test]
1124 async fn auto_handles_apply_patch_approval_when_handler_registered() {
1125 let (client, inbound_tx, mut outbound_rx) = test_client();
1126
1127 client
1128 .set_apply_patch_approval_handler(|_| async {
1129 let mut response = server_requests::ApplyPatchApprovalResponse::default();
1130 response
1131 .extra
1132 .insert("decision".to_string(), Value::String("approve".to_string()));
1133 Ok(response)
1134 })
1135 .await;
1136
1137 inbound_tx
1138 .send(Ok(json!({
1139 "id": 42,
1140 "method": "applyPatchApproval",
1141 "params": {}
1142 })))
1143 .await
1144 .expect("send inbound server request");
1145
1146 let outbound = timeout(Duration::from_secs(2), outbound_rx.recv())
1147 .await
1148 .expect("timed out waiting for outbound response")
1149 .expect("expected outbound response frame");
1150 assert_eq!(outbound.get("id"), Some(&json!(42)));
1151 assert_eq!(
1152 outbound.pointer("/result/decision"),
1153 Some(&Value::String("approve".to_string()))
1154 );
1155 }
1156
1157 #[tokio::test]
1158 async fn unhandled_server_request_is_published_as_event() {
1159 let (client, inbound_tx, mut outbound_rx) = test_client();
1160
1161 inbound_tx
1162 .send(Ok(json!({
1163 "id": 7,
1164 "method": "applyPatchApproval",
1165 "params": {}
1166 })))
1167 .await
1168 .expect("send inbound server request");
1169
1170 let event = timeout(Duration::from_secs(2), client.next_event())
1171 .await
1172 .expect("timed out waiting for event")
1173 .expect("event receive");
1174
1175 match event {
1176 ServerEvent::ServerRequest(ServerRequestEvent::ApplyPatchApproval { id, .. }) => {
1177 assert_eq!(id, RequestId::Integer(7));
1178 }
1179 other => panic!("unexpected event: {other:?}"),
1180 }
1181
1182 assert!(
1183 timeout(Duration::from_millis(200), outbound_rx.recv())
1184 .await
1185 .is_err(),
1186 "did not expect auto-response when handler is absent"
1187 );
1188 }
1189}