1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
6use std::time::Duration;
7
8use serde::Serialize;
9use serde_json::{Value, json};
10use tokio::sync::{Mutex, RwLock, broadcast, mpsc, oneshot};
11
12use crate::api::{Codex, Thread, ThreadOptions};
13use crate::error::{ClientError, IncomingClassified, RpcError, classify_incoming};
14use crate::events::{
15 ServerEvent, ServerNotification, ServerRequestEvent, parse_notification, parse_server_request,
16};
17use crate::protocol::requests;
18use crate::protocol::responses;
19use crate::protocol::server_requests;
20use crate::protocol::shared::{EmptyObject, RequestId};
21use crate::transport::TransportHandle;
22use crate::transport::stdio::spawn_stdio_transport;
23use crate::transport::ws::connect_ws_transport;
24use crate::transport::ws_daemon::ensure_local_ws_app_server;
25
26type PendingMap = HashMap<RequestId, oneshot::Sender<Result<Value, RpcError>>>;
27type RefreshFuture = Pin<
28 Box<
29 dyn Future<Output = Result<server_requests::ChatgptAuthTokensRefreshResponse, ClientError>>
30 + Send,
31 >,
32>;
33type RefreshHandler =
34 Arc<dyn Fn(server_requests::ChatgptAuthTokensRefreshParams) -> RefreshFuture + Send + Sync>;
35type ApplyPatchApprovalFuture = Pin<
36 Box<
37 dyn Future<Output = Result<server_requests::ApplyPatchApprovalResponse, ClientError>>
38 + Send,
39 >,
40>;
41type ApplyPatchApprovalHandler = Arc<
42 dyn Fn(server_requests::ApplyPatchApprovalParams) -> ApplyPatchApprovalFuture + Send + Sync,
43>;
44type ExecCommandApprovalFuture = Pin<
45 Box<
46 dyn Future<Output = Result<server_requests::ExecCommandApprovalResponse, ClientError>>
47 + Send,
48 >,
49>;
50type ExecCommandApprovalHandler = Arc<
51 dyn Fn(server_requests::ExecCommandApprovalParams) -> ExecCommandApprovalFuture + Send + Sync,
52>;
53type CommandExecutionRequestApprovalFuture = Pin<
54 Box<
55 dyn Future<
56 Output = Result<
57 server_requests::CommandExecutionRequestApprovalResponse,
58 ClientError,
59 >,
60 > + Send,
61 >,
62>;
63type CommandExecutionRequestApprovalHandler = Arc<
64 dyn Fn(
65 server_requests::CommandExecutionRequestApprovalParams,
66 ) -> CommandExecutionRequestApprovalFuture
67 + Send
68 + Sync,
69>;
70type FileChangeRequestApprovalFuture = Pin<
71 Box<
72 dyn Future<Output = Result<server_requests::FileChangeRequestApprovalResponse, ClientError>>
73 + Send,
74 >,
75>;
76type FileChangeRequestApprovalHandler = Arc<
77 dyn Fn(server_requests::FileChangeRequestApprovalParams) -> FileChangeRequestApprovalFuture
78 + Send
79 + Sync,
80>;
81type ToolRequestUserInputFuture = Pin<
82 Box<
83 dyn Future<Output = Result<server_requests::ToolRequestUserInputResponse, ClientError>>
84 + Send,
85 >,
86>;
87type ToolRequestUserInputHandler = Arc<
88 dyn Fn(server_requests::ToolRequestUserInputParams) -> ToolRequestUserInputFuture + Send + Sync,
89>;
90type DynamicToolCallFuture = Pin<
91 Box<dyn Future<Output = Result<server_requests::DynamicToolCallResponse, ClientError>> + Send>,
92>;
93type DynamicToolCallHandler =
94 Arc<dyn Fn(server_requests::DynamicToolCallParams) -> DynamicToolCallFuture + Send + Sync>;
95
96#[derive(Debug, Clone)]
97pub struct ClientOptions {
98 pub default_timeout: Duration,
99}
100
101impl Default for ClientOptions {
102 fn default() -> Self {
103 Self {
104 default_timeout: Duration::from_secs(30),
105 }
106 }
107}
108
109#[derive(Debug, Clone)]
110pub struct StdioConfig {
111 pub codex_binary: String,
112 pub args: Vec<String>,
113 pub env: HashMap<String, String>,
114 pub options: ClientOptions,
115}
116
117impl Default for StdioConfig {
118 fn default() -> Self {
119 Self {
120 codex_binary: "codex".to_string(),
121 args: vec!["app-server".to_string()],
122 env: HashMap::new(),
123 options: ClientOptions::default(),
124 }
125 }
126}
127
128#[derive(Debug, Clone)]
129pub struct WsConfig {
130 pub url: String,
131 pub env: HashMap<String, String>,
132 pub options: ClientOptions,
133}
134
135impl WsConfig {
136 pub fn new(
137 url: impl Into<String>,
138 env: HashMap<String, String>,
139 options: ClientOptions,
140 ) -> Self {
141 Self {
142 url: url.into(),
143 env,
144 options,
145 }
146 }
147
148 pub fn with_url(mut self, url: impl Into<String>) -> Self {
149 self.url = url.into();
150 self
151 }
152
153 pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
154 self.env = env;
155 self
156 }
157}
158
159impl Default for WsConfig {
160 fn default() -> Self {
161 Self {
162 url: String::from("ws://127.0.0.1:4222"),
163 env: HashMap::new(),
164 options: ClientOptions::default(),
165 }
166 }
167}
168
169struct Inner {
170 outbound: mpsc::Sender<Value>,
171 pending: Mutex<PendingMap>,
172 default_timeout: Duration,
173 initialized: AtomicBool,
174 ready: AtomicBool,
175 next_id: AtomicI64,
176 event_tx: broadcast::Sender<ServerEvent>,
177 event_rx: Mutex<broadcast::Receiver<ServerEvent>>,
178 refresh_handler: RwLock<Option<RefreshHandler>>,
179 apply_patch_approval_handler: RwLock<Option<ApplyPatchApprovalHandler>>,
180 exec_command_approval_handler: RwLock<Option<ExecCommandApprovalHandler>>,
181 command_execution_request_approval_handler:
182 RwLock<Option<CommandExecutionRequestApprovalHandler>>,
183 file_change_request_approval_handler: RwLock<Option<FileChangeRequestApprovalHandler>>,
184 tool_request_user_input_handler: RwLock<Option<ToolRequestUserInputHandler>>,
185 dynamic_tool_call_handler: RwLock<Option<DynamicToolCallHandler>>,
186}
187
188#[derive(Clone)]
189pub struct CodexClient {
190 inner: Arc<Inner>,
191}
192
193macro_rules! typed_method {
194 ($fn_name:ident, $method:literal, $params_ty:ty, $result_ty:ty) => {
195 pub async fn $fn_name(&self, params: $params_ty) -> Result<$result_ty, ClientError> {
196 self.request_typed_internal($method, params, None, true)
197 .await
198 }
199 };
200}
201
202macro_rules! typed_null_method {
203 ($fn_name:ident, $method:literal, $result_ty:ty) => {
204 pub async fn $fn_name(&self) -> Result<$result_ty, ClientError> {
205 self.request_typed_value_internal($method, Value::Null, None, true)
206 .await
207 }
208 };
209}
210
211impl CodexClient {
212 pub async fn spawn_stdio(config: StdioConfig) -> Result<Self, ClientError> {
213 let handle = spawn_stdio_transport(&config.codex_binary, &config.args, &config.env).await?;
214 Ok(Self::from_transport(handle, config.options.default_timeout))
215 }
216
217 pub async fn connect_ws(config: WsConfig) -> Result<Self, ClientError> {
218 let handle = connect_ws_transport(&config.url).await?;
219 Ok(Self::from_transport(handle, config.options.default_timeout))
220 }
221
222 pub async fn start_and_connect_ws(config: WsConfig) -> Result<Self, ClientError> {
223 ensure_local_ws_app_server(&config.url, &config.env).await?;
224
225 let handle = connect_ws_transport(&config.url).await?;
226 Ok(Self::from_transport(handle, config.options.default_timeout))
227 }
228
229 fn from_transport(handle: TransportHandle, default_timeout: Duration) -> Self {
230 let (event_tx, event_rx) = broadcast::channel(1024);
231 let inner = Arc::new(Inner {
232 outbound: handle.outbound,
233 pending: Mutex::new(HashMap::new()),
234 default_timeout,
235 initialized: AtomicBool::new(false),
236 ready: AtomicBool::new(false),
237 next_id: AtomicI64::new(1),
238 event_tx,
239 event_rx: Mutex::new(event_rx),
240 refresh_handler: RwLock::new(None),
241 apply_patch_approval_handler: RwLock::new(None),
242 exec_command_approval_handler: RwLock::new(None),
243 command_execution_request_approval_handler: RwLock::new(None),
244 file_change_request_approval_handler: RwLock::new(None),
245 tool_request_user_input_handler: RwLock::new(None),
246 dynamic_tool_call_handler: RwLock::new(None),
247 });
248
249 tokio::spawn(run_inbound_loop(handle.inbound, inner.clone()));
250 Self { inner }
251 }
252
253 pub fn as_api(&self) -> Codex {
254 Codex::from_client(self.clone())
255 }
256
257 pub fn start_thread(&self, options: ThreadOptions) -> Thread {
258 self.as_api().start_thread(options)
259 }
260
261 pub fn resume_thread(&self, id: impl Into<String>, options: ThreadOptions) -> Thread {
262 self.as_api().resume_thread(id, options)
263 }
264
265 pub fn subscribe(&self) -> broadcast::Receiver<ServerEvent> {
266 self.inner.event_tx.subscribe()
267 }
268
269 pub async fn next_event(&self) -> Result<ServerEvent, ClientError> {
270 let mut rx = self.inner.event_rx.lock().await;
271 rx.recv().await.map_err(|err| {
272 ClientError::TransportSend(format!("event channel receive failed: {err}"))
273 })
274 }
275
276 pub async fn set_chatgpt_auth_tokens_refresh_handler<F, Fut>(&self, handler: F)
277 where
278 F: Fn(server_requests::ChatgptAuthTokensRefreshParams) -> Fut + Send + Sync + 'static,
279 Fut: Future<Output = Result<server_requests::ChatgptAuthTokensRefreshResponse, ClientError>>
280 + Send
281 + 'static,
282 {
283 let wrapped: RefreshHandler = Arc::new(move |params| Box::pin(handler(params)));
284 *self.inner.refresh_handler.write().await = Some(wrapped);
285 }
286
287 pub async fn clear_chatgpt_auth_tokens_refresh_handler(&self) {
288 *self.inner.refresh_handler.write().await = None;
289 }
290
291 pub async fn set_apply_patch_approval_handler<F, Fut>(&self, handler: F)
292 where
293 F: Fn(server_requests::ApplyPatchApprovalParams) -> Fut + Send + Sync + 'static,
294 Fut: Future<Output = Result<server_requests::ApplyPatchApprovalResponse, ClientError>>
295 + Send
296 + 'static,
297 {
298 let wrapped: ApplyPatchApprovalHandler = Arc::new(move |params| Box::pin(handler(params)));
299 *self.inner.apply_patch_approval_handler.write().await = Some(wrapped);
300 }
301
302 pub async fn clear_apply_patch_approval_handler(&self) {
303 *self.inner.apply_patch_approval_handler.write().await = None;
304 }
305
306 pub async fn set_exec_command_approval_handler<F, Fut>(&self, handler: F)
307 where
308 F: Fn(server_requests::ExecCommandApprovalParams) -> Fut + Send + Sync + 'static,
309 Fut: Future<Output = Result<server_requests::ExecCommandApprovalResponse, ClientError>>
310 + Send
311 + 'static,
312 {
313 let wrapped: ExecCommandApprovalHandler = Arc::new(move |params| Box::pin(handler(params)));
314 *self.inner.exec_command_approval_handler.write().await = Some(wrapped);
315 }
316
317 pub async fn clear_exec_command_approval_handler(&self) {
318 *self.inner.exec_command_approval_handler.write().await = None;
319 }
320
321 pub async fn set_command_execution_request_approval_handler<F, Fut>(&self, handler: F)
322 where
323 F: Fn(server_requests::CommandExecutionRequestApprovalParams) -> Fut
324 + Send
325 + Sync
326 + 'static,
327 Fut: Future<
328 Output = Result<
329 server_requests::CommandExecutionRequestApprovalResponse,
330 ClientError,
331 >,
332 > + Send
333 + 'static,
334 {
335 let wrapped: CommandExecutionRequestApprovalHandler =
336 Arc::new(move |params| Box::pin(handler(params)));
337 *self
338 .inner
339 .command_execution_request_approval_handler
340 .write()
341 .await = Some(wrapped);
342 }
343
344 pub async fn clear_command_execution_request_approval_handler(&self) {
345 *self
346 .inner
347 .command_execution_request_approval_handler
348 .write()
349 .await = None;
350 }
351
352 pub async fn set_file_change_request_approval_handler<F, Fut>(&self, handler: F)
353 where
354 F: Fn(server_requests::FileChangeRequestApprovalParams) -> Fut + Send + Sync + 'static,
355 Fut: Future<Output = Result<server_requests::FileChangeRequestApprovalResponse, ClientError>>
356 + Send
357 + 'static,
358 {
359 let wrapped: FileChangeRequestApprovalHandler =
360 Arc::new(move |params| Box::pin(handler(params)));
361 *self
362 .inner
363 .file_change_request_approval_handler
364 .write()
365 .await = Some(wrapped);
366 }
367
368 pub async fn clear_file_change_request_approval_handler(&self) {
369 *self
370 .inner
371 .file_change_request_approval_handler
372 .write()
373 .await = None;
374 }
375
376 pub async fn set_tool_request_user_input_handler<F, Fut>(&self, handler: F)
377 where
378 F: Fn(server_requests::ToolRequestUserInputParams) -> Fut + Send + Sync + 'static,
379 Fut: Future<Output = Result<server_requests::ToolRequestUserInputResponse, ClientError>>
380 + Send
381 + 'static,
382 {
383 let wrapped: ToolRequestUserInputHandler =
384 Arc::new(move |params| Box::pin(handler(params)));
385 *self.inner.tool_request_user_input_handler.write().await = Some(wrapped);
386 }
387
388 pub async fn clear_tool_request_user_input_handler(&self) {
389 *self.inner.tool_request_user_input_handler.write().await = None;
390 }
391
392 pub async fn set_dynamic_tool_call_handler<F, Fut>(&self, handler: F)
393 where
394 F: Fn(server_requests::DynamicToolCallParams) -> Fut + Send + Sync + 'static,
395 Fut: Future<Output = Result<server_requests::DynamicToolCallResponse, ClientError>>
396 + Send
397 + 'static,
398 {
399 let wrapped: DynamicToolCallHandler = Arc::new(move |params| Box::pin(handler(params)));
400 *self.inner.dynamic_tool_call_handler.write().await = Some(wrapped);
401 }
402
403 pub async fn clear_dynamic_tool_call_handler(&self) {
404 *self.inner.dynamic_tool_call_handler.write().await = None;
405 }
406
407 pub async fn initialize(
408 &self,
409 params: requests::InitializeParams,
410 ) -> Result<responses::InitializeResult, ClientError> {
411 if self.inner.initialized.load(Ordering::SeqCst) {
412 return Err(ClientError::AlreadyInitialized);
413 }
414
415 let result: responses::InitializeResult = self
416 .request_typed_internal("initialize", params, None, false)
417 .await?;
418
419 self.inner.initialized.store(true, Ordering::SeqCst);
420 Ok(result)
421 }
422
423 pub async fn initialized(&self) -> Result<(), ClientError> {
424 if !self.inner.initialized.load(Ordering::SeqCst) {
425 return Err(ClientError::NotInitialized {
426 method: "initialized".to_string(),
427 });
428 }
429 self.send_notification("initialized", EmptyObject::default(), false)
430 .await?;
431 self.inner.ready.store(true, Ordering::SeqCst);
432 Ok(())
433 }
434
435 pub async fn send_raw_request(
436 &self,
437 method: impl Into<String>,
438 params: Value,
439 timeout: Option<Duration>,
440 ) -> Result<Value, ClientError> {
441 let method = method.into();
442 let requires_ready = method != "initialize";
443 self.request_value_internal(&method, params, timeout, requires_ready)
444 .await
445 }
446
447 pub async fn send_raw_notification(
448 &self,
449 method: impl Into<String>,
450 params: Value,
451 ) -> Result<(), ClientError> {
452 let method = method.into();
453 let requires_ready = method != "initialized";
454 self.send_notification(&method, params, requires_ready)
455 .await
456 }
457
458 pub async fn respond_server_request<R: Serialize>(
459 &self,
460 id: RequestId,
461 result: R,
462 ) -> Result<(), ClientError> {
463 let result = serde_json::to_value(result)?;
464 self.send_message(json!({ "id": id, "result": result }))
465 .await
466 }
467
468 pub async fn respond_server_request_error(
469 &self,
470 id: RequestId,
471 error: RpcError,
472 ) -> Result<(), ClientError> {
473 self.send_message(json!({ "id": id, "error": error })).await
474 }
475
476 pub async fn respond_chatgpt_auth_tokens_refresh(
477 &self,
478 id: RequestId,
479 response: server_requests::ChatgptAuthTokensRefreshResponse,
480 ) -> Result<(), ClientError> {
481 self.respond_server_request(id, response).await
482 }
483
484 pub async fn respond_apply_patch_approval(
485 &self,
486 id: RequestId,
487 response: server_requests::ApplyPatchApprovalResponse,
488 ) -> Result<(), ClientError> {
489 self.respond_server_request(id, response).await
490 }
491
492 pub async fn respond_exec_command_approval(
493 &self,
494 id: RequestId,
495 response: server_requests::ExecCommandApprovalResponse,
496 ) -> Result<(), ClientError> {
497 self.respond_server_request(id, response).await
498 }
499
500 pub async fn respond_command_execution_request_approval(
501 &self,
502 id: RequestId,
503 response: server_requests::CommandExecutionRequestApprovalResponse,
504 ) -> Result<(), ClientError> {
505 self.respond_server_request(id, response).await
506 }
507
508 pub async fn respond_file_change_request_approval(
509 &self,
510 id: RequestId,
511 response: server_requests::FileChangeRequestApprovalResponse,
512 ) -> Result<(), ClientError> {
513 self.respond_server_request(id, response).await
514 }
515
516 pub async fn respond_tool_request_user_input(
517 &self,
518 id: RequestId,
519 response: server_requests::ToolRequestUserInputResponse,
520 ) -> Result<(), ClientError> {
521 self.respond_server_request(id, response).await
522 }
523
524 pub async fn respond_dynamic_tool_call(
525 &self,
526 id: RequestId,
527 response: server_requests::DynamicToolCallResponse,
528 ) -> Result<(), ClientError> {
529 self.respond_server_request(id, response).await
530 }
531
532 typed_method!(
533 thread_start,
534 "thread/start",
535 requests::ThreadStartParams,
536 responses::ThreadResult
537 );
538 typed_method!(
539 thread_resume,
540 "thread/resume",
541 requests::ThreadResumeParams,
542 responses::ThreadResult
543 );
544 typed_method!(
545 thread_fork,
546 "thread/fork",
547 requests::ThreadForkParams,
548 responses::ThreadResult
549 );
550 typed_method!(
551 thread_archive,
552 "thread/archive",
553 requests::ThreadArchiveParams,
554 responses::ThreadArchiveResult
555 );
556 typed_method!(
557 thread_name_set,
558 "thread/name/set",
559 requests::ThreadSetNameParams,
560 responses::ThreadSetNameResult
561 );
562 typed_method!(
563 thread_unarchive,
564 "thread/unarchive",
565 requests::ThreadUnarchiveParams,
566 responses::ThreadUnarchiveResult
567 );
568 typed_method!(
569 thread_compact_start,
570 "thread/compact/start",
571 requests::ThreadCompactStartParams,
572 responses::ThreadCompactStartResult
573 );
574 typed_method!(
575 thread_background_terminals_clean,
576 "thread/backgroundTerminals/clean",
577 requests::ThreadBackgroundTerminalsCleanParams,
578 responses::ThreadBackgroundTerminalsCleanResult
579 );
580 typed_method!(
581 thread_rollback,
582 "thread/rollback",
583 requests::ThreadRollbackParams,
584 responses::ThreadRollbackResult
585 );
586 typed_method!(
587 thread_list,
588 "thread/list",
589 requests::ThreadListParams,
590 responses::ThreadListResult
591 );
592 typed_method!(
593 thread_loaded_list,
594 "thread/loaded/list",
595 requests::ThreadLoadedListParams,
596 responses::ThreadLoadedListResult
597 );
598 typed_method!(
599 thread_read,
600 "thread/read",
601 requests::ThreadReadParams,
602 responses::ThreadReadResult
603 );
604 typed_method!(
605 skills_list,
606 "skills/list",
607 requests::SkillsListParams,
608 responses::SkillsListResult
609 );
610 typed_method!(
611 skills_remote_list,
612 "skills/remote/list",
613 requests::SkillsRemoteReadParams,
614 responses::SkillsRemoteReadResult
615 );
616 typed_method!(
617 skills_remote_export,
618 "skills/remote/export",
619 requests::SkillsRemoteWriteParams,
620 responses::SkillsRemoteWriteResult
621 );
622 typed_method!(
623 app_list,
624 "app/list",
625 requests::AppsListParams,
626 responses::AppsListResult
627 );
628 typed_method!(
629 skills_config_write,
630 "skills/config/write",
631 requests::SkillsConfigWriteParams,
632 responses::SkillsConfigWriteResult
633 );
634 typed_method!(
635 turn_start,
636 "turn/start",
637 requests::TurnStartParams,
638 responses::TurnResult
639 );
640 typed_method!(
641 turn_steer,
642 "turn/steer",
643 requests::TurnSteerParams,
644 responses::TurnSteerResult
645 );
646 typed_method!(
647 turn_interrupt,
648 "turn/interrupt",
649 requests::TurnInterruptParams,
650 EmptyObject
651 );
652 typed_method!(
653 review_start,
654 "review/start",
655 requests::ReviewStartParams,
656 responses::ReviewStartResult
657 );
658 typed_method!(
659 model_list,
660 "model/list",
661 requests::ModelListParams,
662 responses::ModelListResult
663 );
664 typed_method!(
665 experimental_feature_list,
666 "experimentalFeature/list",
667 requests::ExperimentalFeatureListParams,
668 responses::ExperimentalFeatureListResult
669 );
670 typed_method!(
671 collaboration_mode_list,
672 "collaborationMode/list",
673 requests::CollaborationModeListParams,
674 responses::CollaborationModeListResult
675 );
676 typed_method!(
677 mock_experimental_method,
678 "mock/experimentalMethod",
679 requests::MockExperimentalMethodParams,
680 responses::MockExperimentalMethodResult
681 );
682 typed_method!(
683 mcp_server_oauth_login,
684 "mcpServer/oauth/login",
685 requests::McpServerOauthLoginParams,
686 responses::McpServerOauthLoginResult
687 );
688 typed_method!(
689 mcp_server_status_list,
690 "mcpServerStatus/list",
691 requests::ListMcpServerStatusParams,
692 responses::McpServerStatusListResult
693 );
694 typed_method!(
695 windows_sandbox_setup_start,
696 "windowsSandbox/setupStart",
697 requests::WindowsSandboxSetupStartParams,
698 responses::WindowsSandboxSetupStartResult
699 );
700 typed_method!(
701 account_login_start,
702 "account/login/start",
703 requests::LoginAccountParams,
704 responses::LoginAccountResult
705 );
706 typed_method!(
707 account_login_cancel,
708 "account/login/cancel",
709 requests::CancelLoginAccountParams,
710 EmptyObject
711 );
712 typed_method!(
713 feedback_upload,
714 "feedback/upload",
715 requests::FeedbackUploadParams,
716 responses::FeedbackUploadResult
717 );
718 typed_method!(
719 command_exec,
720 "command/exec",
721 requests::CommandExecParams,
722 responses::CommandExecResult
723 );
724 typed_method!(
725 config_read,
726 "config/read",
727 requests::ConfigReadParams,
728 responses::ConfigReadResult
729 );
730 typed_method!(
731 config_value_write,
732 "config/value/write",
733 requests::ConfigValueWriteParams,
734 responses::ConfigValueWriteResult
735 );
736 typed_method!(
737 config_batch_write,
738 "config/batchWrite",
739 requests::ConfigBatchWriteParams,
740 responses::ConfigBatchWriteResult
741 );
742 typed_method!(
743 account_read,
744 "account/read",
745 requests::GetAccountParams,
746 responses::GetAccountResult
747 );
748 typed_method!(
749 fuzzy_file_search_session_start,
750 "fuzzyFileSearch/sessionStart",
751 requests::FuzzyFileSearchSessionStartParams,
752 responses::FuzzyFileSearchSessionStartResult
753 );
754 typed_method!(
755 fuzzy_file_search_session_update,
756 "fuzzyFileSearch/sessionUpdate",
757 requests::FuzzyFileSearchSessionUpdateParams,
758 responses::FuzzyFileSearchSessionUpdateResult
759 );
760 typed_method!(
761 fuzzy_file_search_session_stop,
762 "fuzzyFileSearch/sessionStop",
763 requests::FuzzyFileSearchSessionStopParams,
764 responses::FuzzyFileSearchSessionStopResult
765 );
766
767 pub async fn skills_remote_read(
769 &self,
770 params: requests::SkillsRemoteReadParams,
771 ) -> Result<responses::SkillsRemoteReadResult, ClientError> {
772 self.skills_remote_list(params).await
773 }
774
775 pub async fn skills_remote_write(
776 &self,
777 params: requests::SkillsRemoteWriteParams,
778 ) -> Result<responses::SkillsRemoteWriteResult, ClientError> {
779 self.skills_remote_export(params).await
780 }
781
782 typed_null_method!(
783 config_mcp_server_reload,
784 "config/mcpServer/reload",
785 EmptyObject
786 );
787 typed_null_method!(account_logout, "account/logout", EmptyObject);
788 typed_null_method!(
789 account_rate_limits_read,
790 "account/rateLimits/read",
791 responses::AccountRateLimitsReadResult
792 );
793 typed_null_method!(
794 config_requirements_read,
795 "configRequirements/read",
796 responses::ConfigRequirementsReadResult
797 );
798
799 async fn send_notification<P: Serialize>(
800 &self,
801 method: &str,
802 params: P,
803 requires_ready: bool,
804 ) -> Result<(), ClientError> {
805 if requires_ready && !self.inner.ready.load(Ordering::SeqCst) {
806 return Err(ClientError::NotReady {
807 method: method.to_string(),
808 });
809 }
810
811 let value = serde_json::to_value(params)?;
812 self.send_message(json!({ "method": method, "params": value }))
813 .await
814 }
815
816 async fn request_typed_internal<P, R>(
817 &self,
818 method: &str,
819 params: P,
820 timeout: Option<Duration>,
821 requires_ready: bool,
822 ) -> Result<R, ClientError>
823 where
824 P: Serialize,
825 R: serde::de::DeserializeOwned,
826 {
827 let value = serde_json::to_value(params)?;
828 self.request_typed_value_internal(method, value, timeout, requires_ready)
829 .await
830 }
831
832 async fn request_typed_value_internal<R>(
833 &self,
834 method: &str,
835 params: Value,
836 timeout: Option<Duration>,
837 requires_ready: bool,
838 ) -> Result<R, ClientError>
839 where
840 R: serde::de::DeserializeOwned,
841 {
842 let raw = self
843 .request_value_internal(method, params, timeout, requires_ready)
844 .await?;
845
846 serde_json::from_value(raw).map_err(|source| ClientError::UnexpectedResult {
847 method: method.to_string(),
848 source,
849 })
850 }
851
852 async fn request_value_internal(
853 &self,
854 method: &str,
855 params: Value,
856 timeout: Option<Duration>,
857 requires_ready: bool,
858 ) -> Result<Value, ClientError> {
859 if requires_ready && !self.inner.ready.load(Ordering::SeqCst) {
860 return Err(ClientError::NotReady {
861 method: method.to_string(),
862 });
863 }
864
865 if method == "initialize" && self.inner.initialized.load(Ordering::SeqCst) {
866 return Err(ClientError::AlreadyInitialized);
867 }
868
869 let id_num = self.inner.next_id.fetch_add(1, Ordering::SeqCst);
870 let id = RequestId::Integer(id_num);
871
872 let request = json!({
873 "method": method,
874 "id": id,
875 "params": params,
876 });
877
878 let (tx, rx) = oneshot::channel();
879 self.inner.pending.lock().await.insert(id.clone(), tx);
880
881 if let Err(err) = self.send_message(request).await {
882 self.inner.pending.lock().await.remove(&id);
883 return Err(err);
884 }
885
886 let timeout = timeout.unwrap_or(self.inner.default_timeout);
887 match tokio::time::timeout(timeout, rx).await {
888 Ok(Ok(Ok(value))) => Ok(value),
889 Ok(Ok(Err(error))) => Err(ClientError::Rpc { error }),
890 Ok(Err(_)) => Err(ClientError::TransportClosed),
891 Err(_) => {
892 self.inner.pending.lock().await.remove(&id);
893 Err(ClientError::Timeout {
894 method: method.to_string(),
895 timeout_ms: timeout.as_millis() as u64,
896 })
897 }
898 }
899 }
900
901 async fn send_message(&self, value: Value) -> Result<(), ClientError> {
902 self.inner.outbound.send(value).await.map_err(|err| {
903 ClientError::TransportSend(format!("failed to send outbound frame: {err}"))
904 })
905 }
906}
907
908async fn run_inbound_loop(
909 mut inbound: mpsc::Receiver<Result<Value, ClientError>>,
910 inner: Arc<Inner>,
911) {
912 while let Some(frame) = inbound.recv().await {
913 match frame {
914 Ok(value) => {
915 if let Err(err) = process_incoming_value(value, &inner).await {
916 fail_all_pending(&inner, &format!("processing inbound frame failed: {err}"))
917 .await;
918 let _ = inner.event_tx.send(ServerEvent::TransportClosed);
919 break;
920 }
921 }
922 Err(err) => {
923 fail_all_pending(&inner, &format!("transport error: {err}")).await;
924 let _ = inner.event_tx.send(ServerEvent::TransportClosed);
925 break;
926 }
927 }
928 }
929}
930
931async fn process_incoming_value(value: Value, inner: &Arc<Inner>) -> Result<(), ClientError> {
932 match classify_incoming(value)? {
933 IncomingClassified::Response { id, result } => {
934 if let Some(sender) = inner.pending.lock().await.remove(&id) {
935 let _ = sender.send(result);
936 }
937 }
938 IncomingClassified::Notification {
939 method,
940 params,
941 raw: _,
942 } => {
943 let parsed = parse_notification(method.clone(), params.clone())
944 .unwrap_or(ServerNotification::Unknown { method, params });
945 let _ = inner.event_tx.send(ServerEvent::Notification(parsed));
946 }
947 IncomingClassified::ServerRequest {
948 id,
949 method,
950 params,
951 raw: _,
952 } => {
953 let parsed = parse_server_request(id.clone(), method.clone(), params.clone())
954 .unwrap_or(ServerRequestEvent::Unknown { id, method, params });
955 if !try_auto_handle_server_request(inner, &parsed).await {
956 let _ = inner.event_tx.send(ServerEvent::ServerRequest(parsed));
957 }
958 }
959 }
960 Ok(())
961}
962
963async fn try_auto_handle_server_request(inner: &Arc<Inner>, request: &ServerRequestEvent) -> bool {
964 match request {
965 ServerRequestEvent::ChatgptAuthTokensRefresh { id, params } => {
966 let handler = inner.refresh_handler.read().await.clone();
967 let Some(handler) = handler else {
968 return false;
969 };
970
971 let response = handler(params.clone()).await;
972 send_server_request_handler_result(inner, id, response, "chatgptAuthTokens refresh")
973 .await
974 }
975 ServerRequestEvent::ApplyPatchApproval { id, params } => {
976 let handler = inner.apply_patch_approval_handler.read().await.clone();
977 let Some(handler) = handler else {
978 return false;
979 };
980
981 let response = handler(params.clone()).await;
982 send_server_request_handler_result(inner, id, response, "applyPatchApproval").await
983 }
984 ServerRequestEvent::ExecCommandApproval { id, params } => {
985 let handler = inner.exec_command_approval_handler.read().await.clone();
986 let Some(handler) = handler else {
987 return false;
988 };
989
990 let response = handler(params.clone()).await;
991 send_server_request_handler_result(inner, id, response, "execCommandApproval").await
992 }
993 ServerRequestEvent::CommandExecutionRequestApproval { id, params } => {
994 let handler = inner
995 .command_execution_request_approval_handler
996 .read()
997 .await
998 .clone();
999 let Some(handler) = handler else {
1000 return false;
1001 };
1002
1003 let response = handler(params.clone()).await;
1004 send_server_request_handler_result(
1005 inner,
1006 id,
1007 response,
1008 "item/commandExecution/requestApproval",
1009 )
1010 .await
1011 }
1012 ServerRequestEvent::FileChangeRequestApproval { id, params } => {
1013 let handler = inner
1014 .file_change_request_approval_handler
1015 .read()
1016 .await
1017 .clone();
1018 let Some(handler) = handler else {
1019 return false;
1020 };
1021
1022 let response = handler(params.clone()).await;
1023 send_server_request_handler_result(
1024 inner,
1025 id,
1026 response,
1027 "item/fileChange/requestApproval",
1028 )
1029 .await
1030 }
1031 ServerRequestEvent::ToolRequestUserInput { id, params } => {
1032 let handler = inner.tool_request_user_input_handler.read().await.clone();
1033 let Some(handler) = handler else {
1034 return false;
1035 };
1036
1037 let response = handler(params.clone()).await;
1038 send_server_request_handler_result(inner, id, response, "item/tool/requestUserInput")
1039 .await
1040 }
1041 ServerRequestEvent::DynamicToolCall { id, params } => {
1042 let handler = inner.dynamic_tool_call_handler.read().await.clone();
1043 let Some(handler) = handler else {
1044 return false;
1045 };
1046
1047 let response = handler(params.clone()).await;
1048 send_server_request_handler_result(inner, id, response, "item/tool/call").await
1049 }
1050 _ => false,
1051 }
1052}
1053
1054async fn send_server_request_handler_result<R: Serialize>(
1055 inner: &Arc<Inner>,
1056 id: &RequestId,
1057 response: Result<R, ClientError>,
1058 context: &str,
1059) -> bool {
1060 let payload = match response {
1061 Ok(result) => json!({ "id": id, "result": result }),
1062 Err(err) => json!({
1063 "id": id,
1064 "error": {
1065 "code": -32001,
1066 "message": format!("{context} handler failed: {err}")
1067 }
1068 }),
1069 };
1070
1071 if inner.outbound.send(payload).await.is_err() {
1072 let _ = inner.event_tx.send(ServerEvent::TransportClosed);
1073 }
1074
1075 true
1076}
1077
1078async fn fail_all_pending(inner: &Arc<Inner>, message: &str) {
1079 let mut pending = inner.pending.lock().await;
1080 let entries = std::mem::take(&mut *pending);
1081 drop(pending);
1082
1083 for (_, sender) in entries {
1084 let _ = sender.send(Err(RpcError {
1085 code: -32098,
1086 message: message.to_string(),
1087 data: None,
1088 }));
1089 }
1090}
1091
1092#[cfg(test)]
1093mod tests {
1094 use super::*;
1095 use tokio::time::{Duration, timeout};
1096
1097 fn test_client() -> (
1098 CodexClient,
1099 mpsc::Sender<Result<Value, ClientError>>,
1100 mpsc::Receiver<Value>,
1101 ) {
1102 let (transport_outbound_tx, transport_outbound_rx) = mpsc::channel::<Value>(32);
1103 let (transport_inbound_tx, transport_inbound_rx) =
1104 mpsc::channel::<Result<Value, ClientError>>(32);
1105 let client = CodexClient::from_transport(
1106 TransportHandle {
1107 outbound: transport_outbound_tx,
1108 inbound: transport_inbound_rx,
1109 },
1110 Duration::from_secs(5),
1111 );
1112 (client, transport_inbound_tx, transport_outbound_rx)
1113 }
1114
1115 #[tokio::test]
1116 async fn auto_handles_apply_patch_approval_when_handler_registered() {
1117 let (client, inbound_tx, mut outbound_rx) = test_client();
1118
1119 client
1120 .set_apply_patch_approval_handler(|_| async {
1121 let mut response = server_requests::ApplyPatchApprovalResponse::default();
1122 response
1123 .extra
1124 .insert("decision".to_string(), Value::String("approve".to_string()));
1125 Ok(response)
1126 })
1127 .await;
1128
1129 inbound_tx
1130 .send(Ok(json!({
1131 "id": 42,
1132 "method": "applyPatchApproval",
1133 "params": {}
1134 })))
1135 .await
1136 .expect("send inbound server request");
1137
1138 let outbound = timeout(Duration::from_secs(2), outbound_rx.recv())
1139 .await
1140 .expect("timed out waiting for outbound response")
1141 .expect("expected outbound response frame");
1142 assert_eq!(outbound.get("id"), Some(&json!(42)));
1143 assert_eq!(
1144 outbound.pointer("/result/decision"),
1145 Some(&Value::String("approve".to_string()))
1146 );
1147 }
1148
1149 #[tokio::test]
1150 async fn unhandled_server_request_is_published_as_event() {
1151 let (client, inbound_tx, mut outbound_rx) = test_client();
1152
1153 inbound_tx
1154 .send(Ok(json!({
1155 "id": 7,
1156 "method": "applyPatchApproval",
1157 "params": {}
1158 })))
1159 .await
1160 .expect("send inbound server request");
1161
1162 let event = timeout(Duration::from_secs(2), client.next_event())
1163 .await
1164 .expect("timed out waiting for event")
1165 .expect("event receive");
1166
1167 match event {
1168 ServerEvent::ServerRequest(ServerRequestEvent::ApplyPatchApproval { id, .. }) => {
1169 assert_eq!(id, RequestId::Integer(7));
1170 }
1171 other => panic!("unexpected event: {other:?}"),
1172 }
1173
1174 assert!(
1175 timeout(Duration::from_millis(200), outbound_rx.recv())
1176 .await
1177 .is_err(),
1178 "did not expect auto-response when handler is absent"
1179 );
1180 }
1181}