Skip to main content

codex_app_server_sdk/client/
mod.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
6use std::time::Duration;
7
8use serde::Serialize;
9use serde_json::{Value, json};
10use tokio::sync::{Mutex, RwLock, broadcast, mpsc, oneshot};
11
12use crate::api::{Codex, Thread, ThreadOptions};
13use crate::error::{ClientError, IncomingClassified, RpcError, classify_incoming};
14use crate::events::{
15    ServerEvent, ServerNotification, ServerRequestEvent, parse_notification, parse_server_request,
16};
17use crate::protocol::requests;
18use crate::protocol::responses;
19use crate::protocol::server_requests;
20use crate::protocol::shared::{EmptyObject, RequestId};
21use crate::transport::TransportHandle;
22use crate::transport::stdio::spawn_stdio_transport;
23use crate::transport::ws::connect_ws_transport;
24use crate::transport::ws_daemon::ensure_local_ws_app_server;
25
26type PendingMap = HashMap<RequestId, oneshot::Sender<Result<Value, RpcError>>>;
27type RefreshFuture = Pin<
28    Box<
29        dyn Future<Output = Result<server_requests::ChatgptAuthTokensRefreshResponse, ClientError>>
30            + Send,
31    >,
32>;
33type RefreshHandler =
34    Arc<dyn Fn(server_requests::ChatgptAuthTokensRefreshParams) -> RefreshFuture + Send + Sync>;
35type ApplyPatchApprovalFuture = Pin<
36    Box<
37        dyn Future<Output = Result<server_requests::ApplyPatchApprovalResponse, ClientError>>
38            + Send,
39    >,
40>;
41type ApplyPatchApprovalHandler = Arc<
42    dyn Fn(server_requests::ApplyPatchApprovalParams) -> ApplyPatchApprovalFuture + Send + Sync,
43>;
44type ExecCommandApprovalFuture = Pin<
45    Box<
46        dyn Future<Output = Result<server_requests::ExecCommandApprovalResponse, ClientError>>
47            + Send,
48    >,
49>;
50type ExecCommandApprovalHandler = Arc<
51    dyn Fn(server_requests::ExecCommandApprovalParams) -> ExecCommandApprovalFuture + Send + Sync,
52>;
53type CommandExecutionRequestApprovalFuture = Pin<
54    Box<
55        dyn Future<
56                Output = Result<
57                    server_requests::CommandExecutionRequestApprovalResponse,
58                    ClientError,
59                >,
60            > + Send,
61    >,
62>;
63type CommandExecutionRequestApprovalHandler = Arc<
64    dyn Fn(
65            server_requests::CommandExecutionRequestApprovalParams,
66        ) -> CommandExecutionRequestApprovalFuture
67        + Send
68        + Sync,
69>;
70type FileChangeRequestApprovalFuture = Pin<
71    Box<
72        dyn Future<Output = Result<server_requests::FileChangeRequestApprovalResponse, ClientError>>
73            + Send,
74    >,
75>;
76type FileChangeRequestApprovalHandler = Arc<
77    dyn Fn(server_requests::FileChangeRequestApprovalParams) -> FileChangeRequestApprovalFuture
78        + Send
79        + Sync,
80>;
81type ToolRequestUserInputFuture = Pin<
82    Box<
83        dyn Future<Output = Result<server_requests::ToolRequestUserInputResponse, ClientError>>
84            + Send,
85    >,
86>;
87type ToolRequestUserInputHandler = Arc<
88    dyn Fn(server_requests::ToolRequestUserInputParams) -> ToolRequestUserInputFuture + Send + Sync,
89>;
90type DynamicToolCallFuture = Pin<
91    Box<dyn Future<Output = Result<server_requests::DynamicToolCallResponse, ClientError>> + Send>,
92>;
93type DynamicToolCallHandler =
94    Arc<dyn Fn(server_requests::DynamicToolCallParams) -> DynamicToolCallFuture + Send + Sync>;
95
96#[derive(Debug, Clone)]
97pub struct ClientOptions {
98    pub default_timeout: Duration,
99}
100
101impl Default for ClientOptions {
102    fn default() -> Self {
103        Self {
104            default_timeout: Duration::from_secs(30),
105        }
106    }
107}
108
109#[derive(Debug, Clone)]
110pub struct StdioConfig {
111    pub codex_binary: String,
112    pub args: Vec<String>,
113    pub env: HashMap<String, String>,
114    pub options: ClientOptions,
115}
116
117impl Default for StdioConfig {
118    fn default() -> Self {
119        Self {
120            codex_binary: "codex".to_string(),
121            args: vec!["app-server".to_string()],
122            env: HashMap::new(),
123            options: ClientOptions::default(),
124        }
125    }
126}
127
128#[derive(Debug, Clone)]
129pub struct WsConfig {
130    pub url: String,
131    pub env: HashMap<String, String>,
132    pub options: ClientOptions,
133}
134
135impl WsConfig {
136    pub fn new(
137        url: impl Into<String>,
138        env: HashMap<String, String>,
139        options: ClientOptions,
140    ) -> Self {
141        Self {
142            url: url.into(),
143            env,
144            options,
145        }
146    }
147
148    pub fn with_url(mut self, url: impl Into<String>) -> Self {
149        self.url = url.into();
150        self
151    }
152
153    pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
154        self.env = env;
155        self
156    }
157}
158
159impl Default for WsConfig {
160    fn default() -> Self {
161        Self {
162            url: String::from("ws://127.0.0.1:4222"),
163            env: HashMap::new(),
164            options: ClientOptions::default(),
165        }
166    }
167}
168
169struct Inner {
170    outbound: mpsc::Sender<Value>,
171    pending: Mutex<PendingMap>,
172    default_timeout: Duration,
173    initialized: AtomicBool,
174    ready: AtomicBool,
175    next_id: AtomicI64,
176    event_tx: broadcast::Sender<ServerEvent>,
177    event_rx: Mutex<broadcast::Receiver<ServerEvent>>,
178    refresh_handler: RwLock<Option<RefreshHandler>>,
179    apply_patch_approval_handler: RwLock<Option<ApplyPatchApprovalHandler>>,
180    exec_command_approval_handler: RwLock<Option<ExecCommandApprovalHandler>>,
181    command_execution_request_approval_handler:
182        RwLock<Option<CommandExecutionRequestApprovalHandler>>,
183    file_change_request_approval_handler: RwLock<Option<FileChangeRequestApprovalHandler>>,
184    tool_request_user_input_handler: RwLock<Option<ToolRequestUserInputHandler>>,
185    dynamic_tool_call_handler: RwLock<Option<DynamicToolCallHandler>>,
186}
187
188#[derive(Clone)]
189pub struct CodexClient {
190    inner: Arc<Inner>,
191}
192
193macro_rules! typed_method {
194    ($fn_name:ident, $method:literal, $params_ty:ty, $result_ty:ty) => {
195        pub async fn $fn_name(&self, params: $params_ty) -> Result<$result_ty, ClientError> {
196            self.request_typed_internal($method, params, None, true)
197                .await
198        }
199    };
200}
201
202macro_rules! typed_null_method {
203    ($fn_name:ident, $method:literal, $result_ty:ty) => {
204        pub async fn $fn_name(&self) -> Result<$result_ty, ClientError> {
205            self.request_typed_value_internal($method, Value::Null, None, true)
206                .await
207        }
208    };
209}
210
211impl CodexClient {
212    pub async fn spawn_stdio(config: StdioConfig) -> Result<Self, ClientError> {
213        let handle = spawn_stdio_transport(&config.codex_binary, &config.args, &config.env).await?;
214        Ok(Self::from_transport(handle, config.options.default_timeout))
215    }
216
217    pub async fn connect_ws(config: WsConfig) -> Result<Self, ClientError> {
218        let handle = connect_ws_transport(&config.url).await?;
219        Ok(Self::from_transport(handle, config.options.default_timeout))
220    }
221
222    pub async fn start_and_connect_ws(config: WsConfig) -> Result<Self, ClientError> {
223        ensure_local_ws_app_server(&config.url, &config.env).await?;
224
225        let handle = connect_ws_transport(&config.url).await?;
226        Ok(Self::from_transport(handle, config.options.default_timeout))
227    }
228
229    fn from_transport(handle: TransportHandle, default_timeout: Duration) -> Self {
230        let (event_tx, event_rx) = broadcast::channel(1024);
231        let inner = Arc::new(Inner {
232            outbound: handle.outbound,
233            pending: Mutex::new(HashMap::new()),
234            default_timeout,
235            initialized: AtomicBool::new(false),
236            ready: AtomicBool::new(false),
237            next_id: AtomicI64::new(1),
238            event_tx,
239            event_rx: Mutex::new(event_rx),
240            refresh_handler: RwLock::new(None),
241            apply_patch_approval_handler: RwLock::new(None),
242            exec_command_approval_handler: RwLock::new(None),
243            command_execution_request_approval_handler: RwLock::new(None),
244            file_change_request_approval_handler: RwLock::new(None),
245            tool_request_user_input_handler: RwLock::new(None),
246            dynamic_tool_call_handler: RwLock::new(None),
247        });
248
249        tokio::spawn(run_inbound_loop(handle.inbound, inner.clone()));
250        Self { inner }
251    }
252
253    pub fn as_api(&self) -> Codex {
254        Codex::from_client(self.clone())
255    }
256
257    pub fn start_thread(&self, options: ThreadOptions) -> Thread {
258        self.as_api().start_thread(options)
259    }
260
261    pub fn resume_thread(&self, id: impl Into<String>, options: ThreadOptions) -> Thread {
262        self.as_api().resume_thread(id, options)
263    }
264
265    pub fn subscribe(&self) -> broadcast::Receiver<ServerEvent> {
266        self.inner.event_tx.subscribe()
267    }
268
269    pub async fn next_event(&self) -> Result<ServerEvent, ClientError> {
270        let mut rx = self.inner.event_rx.lock().await;
271        rx.recv().await.map_err(|err| {
272            ClientError::TransportSend(format!("event channel receive failed: {err}"))
273        })
274    }
275
276    pub async fn set_chatgpt_auth_tokens_refresh_handler<F, Fut>(&self, handler: F)
277    where
278        F: Fn(server_requests::ChatgptAuthTokensRefreshParams) -> Fut + Send + Sync + 'static,
279        Fut: Future<Output = Result<server_requests::ChatgptAuthTokensRefreshResponse, ClientError>>
280            + Send
281            + 'static,
282    {
283        let wrapped: RefreshHandler = Arc::new(move |params| Box::pin(handler(params)));
284        *self.inner.refresh_handler.write().await = Some(wrapped);
285    }
286
287    pub async fn clear_chatgpt_auth_tokens_refresh_handler(&self) {
288        *self.inner.refresh_handler.write().await = None;
289    }
290
291    pub async fn set_apply_patch_approval_handler<F, Fut>(&self, handler: F)
292    where
293        F: Fn(server_requests::ApplyPatchApprovalParams) -> Fut + Send + Sync + 'static,
294        Fut: Future<Output = Result<server_requests::ApplyPatchApprovalResponse, ClientError>>
295            + Send
296            + 'static,
297    {
298        let wrapped: ApplyPatchApprovalHandler = Arc::new(move |params| Box::pin(handler(params)));
299        *self.inner.apply_patch_approval_handler.write().await = Some(wrapped);
300    }
301
302    pub async fn clear_apply_patch_approval_handler(&self) {
303        *self.inner.apply_patch_approval_handler.write().await = None;
304    }
305
306    pub async fn set_exec_command_approval_handler<F, Fut>(&self, handler: F)
307    where
308        F: Fn(server_requests::ExecCommandApprovalParams) -> Fut + Send + Sync + 'static,
309        Fut: Future<Output = Result<server_requests::ExecCommandApprovalResponse, ClientError>>
310            + Send
311            + 'static,
312    {
313        let wrapped: ExecCommandApprovalHandler = Arc::new(move |params| Box::pin(handler(params)));
314        *self.inner.exec_command_approval_handler.write().await = Some(wrapped);
315    }
316
317    pub async fn clear_exec_command_approval_handler(&self) {
318        *self.inner.exec_command_approval_handler.write().await = None;
319    }
320
321    pub async fn set_command_execution_request_approval_handler<F, Fut>(&self, handler: F)
322    where
323        F: Fn(server_requests::CommandExecutionRequestApprovalParams) -> Fut
324            + Send
325            + Sync
326            + 'static,
327        Fut: Future<
328                Output = Result<
329                    server_requests::CommandExecutionRequestApprovalResponse,
330                    ClientError,
331                >,
332            > + Send
333            + 'static,
334    {
335        let wrapped: CommandExecutionRequestApprovalHandler =
336            Arc::new(move |params| Box::pin(handler(params)));
337        *self
338            .inner
339            .command_execution_request_approval_handler
340            .write()
341            .await = Some(wrapped);
342    }
343
344    pub async fn clear_command_execution_request_approval_handler(&self) {
345        *self
346            .inner
347            .command_execution_request_approval_handler
348            .write()
349            .await = None;
350    }
351
352    pub async fn set_file_change_request_approval_handler<F, Fut>(&self, handler: F)
353    where
354        F: Fn(server_requests::FileChangeRequestApprovalParams) -> Fut + Send + Sync + 'static,
355        Fut: Future<Output = Result<server_requests::FileChangeRequestApprovalResponse, ClientError>>
356            + Send
357            + 'static,
358    {
359        let wrapped: FileChangeRequestApprovalHandler =
360            Arc::new(move |params| Box::pin(handler(params)));
361        *self
362            .inner
363            .file_change_request_approval_handler
364            .write()
365            .await = Some(wrapped);
366    }
367
368    pub async fn clear_file_change_request_approval_handler(&self) {
369        *self
370            .inner
371            .file_change_request_approval_handler
372            .write()
373            .await = None;
374    }
375
376    pub async fn set_tool_request_user_input_handler<F, Fut>(&self, handler: F)
377    where
378        F: Fn(server_requests::ToolRequestUserInputParams) -> Fut + Send + Sync + 'static,
379        Fut: Future<Output = Result<server_requests::ToolRequestUserInputResponse, ClientError>>
380            + Send
381            + 'static,
382    {
383        let wrapped: ToolRequestUserInputHandler =
384            Arc::new(move |params| Box::pin(handler(params)));
385        *self.inner.tool_request_user_input_handler.write().await = Some(wrapped);
386    }
387
388    pub async fn clear_tool_request_user_input_handler(&self) {
389        *self.inner.tool_request_user_input_handler.write().await = None;
390    }
391
392    pub async fn set_dynamic_tool_call_handler<F, Fut>(&self, handler: F)
393    where
394        F: Fn(server_requests::DynamicToolCallParams) -> Fut + Send + Sync + 'static,
395        Fut: Future<Output = Result<server_requests::DynamicToolCallResponse, ClientError>>
396            + Send
397            + 'static,
398    {
399        let wrapped: DynamicToolCallHandler = Arc::new(move |params| Box::pin(handler(params)));
400        *self.inner.dynamic_tool_call_handler.write().await = Some(wrapped);
401    }
402
403    pub async fn clear_dynamic_tool_call_handler(&self) {
404        *self.inner.dynamic_tool_call_handler.write().await = None;
405    }
406
407    pub async fn initialize(
408        &self,
409        params: requests::InitializeParams,
410    ) -> Result<responses::InitializeResult, ClientError> {
411        if self.inner.initialized.load(Ordering::SeqCst) {
412            return Err(ClientError::AlreadyInitialized);
413        }
414
415        let result: responses::InitializeResult = self
416            .request_typed_internal("initialize", params, None, false)
417            .await?;
418
419        self.inner.initialized.store(true, Ordering::SeqCst);
420        Ok(result)
421    }
422
423    pub async fn initialized(&self) -> Result<(), ClientError> {
424        if !self.inner.initialized.load(Ordering::SeqCst) {
425            return Err(ClientError::NotInitialized {
426                method: "initialized".to_string(),
427            });
428        }
429        self.send_notification("initialized", EmptyObject::default(), false)
430            .await?;
431        self.inner.ready.store(true, Ordering::SeqCst);
432        Ok(())
433    }
434
435    pub async fn send_raw_request(
436        &self,
437        method: impl Into<String>,
438        params: Value,
439        timeout: Option<Duration>,
440    ) -> Result<Value, ClientError> {
441        let method = method.into();
442        let requires_ready = method != "initialize";
443        self.request_value_internal(&method, params, timeout, requires_ready)
444            .await
445    }
446
447    pub async fn send_raw_notification(
448        &self,
449        method: impl Into<String>,
450        params: Value,
451    ) -> Result<(), ClientError> {
452        let method = method.into();
453        let requires_ready = method != "initialized";
454        self.send_notification(&method, params, requires_ready)
455            .await
456    }
457
458    pub async fn respond_server_request<R: Serialize>(
459        &self,
460        id: RequestId,
461        result: R,
462    ) -> Result<(), ClientError> {
463        let result = serde_json::to_value(result)?;
464        self.send_message(json!({ "id": id, "result": result }))
465            .await
466    }
467
468    pub async fn respond_server_request_error(
469        &self,
470        id: RequestId,
471        error: RpcError,
472    ) -> Result<(), ClientError> {
473        self.send_message(json!({ "id": id, "error": error })).await
474    }
475
476    pub async fn respond_chatgpt_auth_tokens_refresh(
477        &self,
478        id: RequestId,
479        response: server_requests::ChatgptAuthTokensRefreshResponse,
480    ) -> Result<(), ClientError> {
481        self.respond_server_request(id, response).await
482    }
483
484    pub async fn respond_apply_patch_approval(
485        &self,
486        id: RequestId,
487        response: server_requests::ApplyPatchApprovalResponse,
488    ) -> Result<(), ClientError> {
489        self.respond_server_request(id, response).await
490    }
491
492    pub async fn respond_exec_command_approval(
493        &self,
494        id: RequestId,
495        response: server_requests::ExecCommandApprovalResponse,
496    ) -> Result<(), ClientError> {
497        self.respond_server_request(id, response).await
498    }
499
500    pub async fn respond_command_execution_request_approval(
501        &self,
502        id: RequestId,
503        response: server_requests::CommandExecutionRequestApprovalResponse,
504    ) -> Result<(), ClientError> {
505        self.respond_server_request(id, response).await
506    }
507
508    pub async fn respond_file_change_request_approval(
509        &self,
510        id: RequestId,
511        response: server_requests::FileChangeRequestApprovalResponse,
512    ) -> Result<(), ClientError> {
513        self.respond_server_request(id, response).await
514    }
515
516    pub async fn respond_tool_request_user_input(
517        &self,
518        id: RequestId,
519        response: server_requests::ToolRequestUserInputResponse,
520    ) -> Result<(), ClientError> {
521        self.respond_server_request(id, response).await
522    }
523
524    pub async fn respond_dynamic_tool_call(
525        &self,
526        id: RequestId,
527        response: server_requests::DynamicToolCallResponse,
528    ) -> Result<(), ClientError> {
529        self.respond_server_request(id, response).await
530    }
531
532    typed_method!(
533        thread_start,
534        "thread/start",
535        requests::ThreadStartParams,
536        responses::ThreadResult
537    );
538    typed_method!(
539        thread_resume,
540        "thread/resume",
541        requests::ThreadResumeParams,
542        responses::ThreadResult
543    );
544    typed_method!(
545        thread_fork,
546        "thread/fork",
547        requests::ThreadForkParams,
548        responses::ThreadResult
549    );
550    typed_method!(
551        thread_archive,
552        "thread/archive",
553        requests::ThreadArchiveParams,
554        responses::ThreadArchiveResult
555    );
556    typed_method!(
557        thread_name_set,
558        "thread/name/set",
559        requests::ThreadSetNameParams,
560        responses::ThreadSetNameResult
561    );
562    typed_method!(
563        thread_unarchive,
564        "thread/unarchive",
565        requests::ThreadUnarchiveParams,
566        responses::ThreadUnarchiveResult
567    );
568    typed_method!(
569        thread_compact_start,
570        "thread/compact/start",
571        requests::ThreadCompactStartParams,
572        responses::ThreadCompactStartResult
573    );
574    typed_method!(
575        thread_background_terminals_clean,
576        "thread/backgroundTerminals/clean",
577        requests::ThreadBackgroundTerminalsCleanParams,
578        responses::ThreadBackgroundTerminalsCleanResult
579    );
580    typed_method!(
581        thread_rollback,
582        "thread/rollback",
583        requests::ThreadRollbackParams,
584        responses::ThreadRollbackResult
585    );
586    typed_method!(
587        thread_list,
588        "thread/list",
589        requests::ThreadListParams,
590        responses::ThreadListResult
591    );
592    typed_method!(
593        thread_loaded_list,
594        "thread/loaded/list",
595        requests::ThreadLoadedListParams,
596        responses::ThreadLoadedListResult
597    );
598    typed_method!(
599        thread_read,
600        "thread/read",
601        requests::ThreadReadParams,
602        responses::ThreadReadResult
603    );
604    typed_method!(
605        skills_list,
606        "skills/list",
607        requests::SkillsListParams,
608        responses::SkillsListResult
609    );
610    typed_method!(
611        skills_remote_list,
612        "skills/remote/list",
613        requests::SkillsRemoteReadParams,
614        responses::SkillsRemoteReadResult
615    );
616    typed_method!(
617        skills_remote_export,
618        "skills/remote/export",
619        requests::SkillsRemoteWriteParams,
620        responses::SkillsRemoteWriteResult
621    );
622    typed_method!(
623        app_list,
624        "app/list",
625        requests::AppsListParams,
626        responses::AppsListResult
627    );
628    typed_method!(
629        skills_config_write,
630        "skills/config/write",
631        requests::SkillsConfigWriteParams,
632        responses::SkillsConfigWriteResult
633    );
634    typed_method!(
635        turn_start,
636        "turn/start",
637        requests::TurnStartParams,
638        responses::TurnResult
639    );
640    typed_method!(
641        turn_steer,
642        "turn/steer",
643        requests::TurnSteerParams,
644        responses::TurnSteerResult
645    );
646    typed_method!(
647        turn_interrupt,
648        "turn/interrupt",
649        requests::TurnInterruptParams,
650        EmptyObject
651    );
652    typed_method!(
653        review_start,
654        "review/start",
655        requests::ReviewStartParams,
656        responses::ReviewStartResult
657    );
658    typed_method!(
659        model_list,
660        "model/list",
661        requests::ModelListParams,
662        responses::ModelListResult
663    );
664    typed_method!(
665        experimental_feature_list,
666        "experimentalFeature/list",
667        requests::ExperimentalFeatureListParams,
668        responses::ExperimentalFeatureListResult
669    );
670    typed_method!(
671        collaboration_mode_list,
672        "collaborationMode/list",
673        requests::CollaborationModeListParams,
674        responses::CollaborationModeListResult
675    );
676    typed_method!(
677        mock_experimental_method,
678        "mock/experimentalMethod",
679        requests::MockExperimentalMethodParams,
680        responses::MockExperimentalMethodResult
681    );
682    typed_method!(
683        mcp_server_oauth_login,
684        "mcpServer/oauth/login",
685        requests::McpServerOauthLoginParams,
686        responses::McpServerOauthLoginResult
687    );
688    typed_method!(
689        mcp_server_status_list,
690        "mcpServerStatus/list",
691        requests::ListMcpServerStatusParams,
692        responses::McpServerStatusListResult
693    );
694    typed_method!(
695        windows_sandbox_setup_start,
696        "windowsSandbox/setupStart",
697        requests::WindowsSandboxSetupStartParams,
698        responses::WindowsSandboxSetupStartResult
699    );
700    typed_method!(
701        account_login_start,
702        "account/login/start",
703        requests::LoginAccountParams,
704        responses::LoginAccountResult
705    );
706    typed_method!(
707        account_login_cancel,
708        "account/login/cancel",
709        requests::CancelLoginAccountParams,
710        EmptyObject
711    );
712    typed_method!(
713        feedback_upload,
714        "feedback/upload",
715        requests::FeedbackUploadParams,
716        responses::FeedbackUploadResult
717    );
718    typed_method!(
719        command_exec,
720        "command/exec",
721        requests::CommandExecParams,
722        responses::CommandExecResult
723    );
724    typed_method!(
725        config_read,
726        "config/read",
727        requests::ConfigReadParams,
728        responses::ConfigReadResult
729    );
730    typed_method!(
731        config_value_write,
732        "config/value/write",
733        requests::ConfigValueWriteParams,
734        responses::ConfigValueWriteResult
735    );
736    typed_method!(
737        config_batch_write,
738        "config/batchWrite",
739        requests::ConfigBatchWriteParams,
740        responses::ConfigBatchWriteResult
741    );
742    typed_method!(
743        account_read,
744        "account/read",
745        requests::GetAccountParams,
746        responses::GetAccountResult
747    );
748    typed_method!(
749        fuzzy_file_search_session_start,
750        "fuzzyFileSearch/sessionStart",
751        requests::FuzzyFileSearchSessionStartParams,
752        responses::FuzzyFileSearchSessionStartResult
753    );
754    typed_method!(
755        fuzzy_file_search_session_update,
756        "fuzzyFileSearch/sessionUpdate",
757        requests::FuzzyFileSearchSessionUpdateParams,
758        responses::FuzzyFileSearchSessionUpdateResult
759    );
760    typed_method!(
761        fuzzy_file_search_session_stop,
762        "fuzzyFileSearch/sessionStop",
763        requests::FuzzyFileSearchSessionStopParams,
764        responses::FuzzyFileSearchSessionStopResult
765    );
766
767    // Backward-compatible aliases for previous method names.
768    pub async fn skills_remote_read(
769        &self,
770        params: requests::SkillsRemoteReadParams,
771    ) -> Result<responses::SkillsRemoteReadResult, ClientError> {
772        self.skills_remote_list(params).await
773    }
774
775    pub async fn skills_remote_write(
776        &self,
777        params: requests::SkillsRemoteWriteParams,
778    ) -> Result<responses::SkillsRemoteWriteResult, ClientError> {
779        self.skills_remote_export(params).await
780    }
781
782    typed_null_method!(
783        config_mcp_server_reload,
784        "config/mcpServer/reload",
785        EmptyObject
786    );
787    typed_null_method!(account_logout, "account/logout", EmptyObject);
788    typed_null_method!(
789        account_rate_limits_read,
790        "account/rateLimits/read",
791        responses::AccountRateLimitsReadResult
792    );
793    typed_null_method!(
794        config_requirements_read,
795        "configRequirements/read",
796        responses::ConfigRequirementsReadResult
797    );
798
799    async fn send_notification<P: Serialize>(
800        &self,
801        method: &str,
802        params: P,
803        requires_ready: bool,
804    ) -> Result<(), ClientError> {
805        if requires_ready && !self.inner.ready.load(Ordering::SeqCst) {
806            return Err(ClientError::NotReady {
807                method: method.to_string(),
808            });
809        }
810
811        let value = serde_json::to_value(params)?;
812        self.send_message(json!({ "method": method, "params": value }))
813            .await
814    }
815
816    async fn request_typed_internal<P, R>(
817        &self,
818        method: &str,
819        params: P,
820        timeout: Option<Duration>,
821        requires_ready: bool,
822    ) -> Result<R, ClientError>
823    where
824        P: Serialize,
825        R: serde::de::DeserializeOwned,
826    {
827        let value = serde_json::to_value(params)?;
828        self.request_typed_value_internal(method, value, timeout, requires_ready)
829            .await
830    }
831
832    async fn request_typed_value_internal<R>(
833        &self,
834        method: &str,
835        params: Value,
836        timeout: Option<Duration>,
837        requires_ready: bool,
838    ) -> Result<R, ClientError>
839    where
840        R: serde::de::DeserializeOwned,
841    {
842        let raw = self
843            .request_value_internal(method, params, timeout, requires_ready)
844            .await?;
845
846        serde_json::from_value(raw).map_err(|source| ClientError::UnexpectedResult {
847            method: method.to_string(),
848            source,
849        })
850    }
851
852    async fn request_value_internal(
853        &self,
854        method: &str,
855        params: Value,
856        timeout: Option<Duration>,
857        requires_ready: bool,
858    ) -> Result<Value, ClientError> {
859        if requires_ready && !self.inner.ready.load(Ordering::SeqCst) {
860            return Err(ClientError::NotReady {
861                method: method.to_string(),
862            });
863        }
864
865        if method == "initialize" && self.inner.initialized.load(Ordering::SeqCst) {
866            return Err(ClientError::AlreadyInitialized);
867        }
868
869        let id_num = self.inner.next_id.fetch_add(1, Ordering::SeqCst);
870        let id = RequestId::Integer(id_num);
871
872        let request = json!({
873            "method": method,
874            "id": id,
875            "params": params,
876        });
877
878        let (tx, rx) = oneshot::channel();
879        self.inner.pending.lock().await.insert(id.clone(), tx);
880
881        if let Err(err) = self.send_message(request).await {
882            self.inner.pending.lock().await.remove(&id);
883            return Err(err);
884        }
885
886        let timeout = timeout.unwrap_or(self.inner.default_timeout);
887        match tokio::time::timeout(timeout, rx).await {
888            Ok(Ok(Ok(value))) => Ok(value),
889            Ok(Ok(Err(error))) => Err(ClientError::Rpc { error }),
890            Ok(Err(_)) => Err(ClientError::TransportClosed),
891            Err(_) => {
892                self.inner.pending.lock().await.remove(&id);
893                Err(ClientError::Timeout {
894                    method: method.to_string(),
895                    timeout_ms: timeout.as_millis() as u64,
896                })
897            }
898        }
899    }
900
901    async fn send_message(&self, value: Value) -> Result<(), ClientError> {
902        self.inner.outbound.send(value).await.map_err(|err| {
903            ClientError::TransportSend(format!("failed to send outbound frame: {err}"))
904        })
905    }
906}
907
908async fn run_inbound_loop(
909    mut inbound: mpsc::Receiver<Result<Value, ClientError>>,
910    inner: Arc<Inner>,
911) {
912    while let Some(frame) = inbound.recv().await {
913        match frame {
914            Ok(value) => {
915                if let Err(err) = process_incoming_value(value, &inner).await {
916                    fail_all_pending(&inner, &format!("processing inbound frame failed: {err}"))
917                        .await;
918                    let _ = inner.event_tx.send(ServerEvent::TransportClosed);
919                    break;
920                }
921            }
922            Err(err) => {
923                fail_all_pending(&inner, &format!("transport error: {err}")).await;
924                let _ = inner.event_tx.send(ServerEvent::TransportClosed);
925                break;
926            }
927        }
928    }
929}
930
931async fn process_incoming_value(value: Value, inner: &Arc<Inner>) -> Result<(), ClientError> {
932    match classify_incoming(value)? {
933        IncomingClassified::Response { id, result } => {
934            if let Some(sender) = inner.pending.lock().await.remove(&id) {
935                let _ = sender.send(result);
936            }
937        }
938        IncomingClassified::Notification {
939            method,
940            params,
941            raw: _,
942        } => {
943            let parsed = parse_notification(method.clone(), params.clone())
944                .unwrap_or(ServerNotification::Unknown { method, params });
945            let _ = inner.event_tx.send(ServerEvent::Notification(parsed));
946        }
947        IncomingClassified::ServerRequest {
948            id,
949            method,
950            params,
951            raw: _,
952        } => {
953            let parsed = parse_server_request(id.clone(), method.clone(), params.clone())
954                .unwrap_or(ServerRequestEvent::Unknown { id, method, params });
955            if !try_auto_handle_server_request(inner, &parsed).await {
956                let _ = inner.event_tx.send(ServerEvent::ServerRequest(parsed));
957            }
958        }
959    }
960    Ok(())
961}
962
963async fn try_auto_handle_server_request(inner: &Arc<Inner>, request: &ServerRequestEvent) -> bool {
964    match request {
965        ServerRequestEvent::ChatgptAuthTokensRefresh { id, params } => {
966            let handler = inner.refresh_handler.read().await.clone();
967            let Some(handler) = handler else {
968                return false;
969            };
970
971            let response = handler(params.clone()).await;
972            send_server_request_handler_result(inner, id, response, "chatgptAuthTokens refresh")
973                .await
974        }
975        ServerRequestEvent::ApplyPatchApproval { id, params } => {
976            let handler = inner.apply_patch_approval_handler.read().await.clone();
977            let Some(handler) = handler else {
978                return false;
979            };
980
981            let response = handler(params.clone()).await;
982            send_server_request_handler_result(inner, id, response, "applyPatchApproval").await
983        }
984        ServerRequestEvent::ExecCommandApproval { id, params } => {
985            let handler = inner.exec_command_approval_handler.read().await.clone();
986            let Some(handler) = handler else {
987                return false;
988            };
989
990            let response = handler(params.clone()).await;
991            send_server_request_handler_result(inner, id, response, "execCommandApproval").await
992        }
993        ServerRequestEvent::CommandExecutionRequestApproval { id, params } => {
994            let handler = inner
995                .command_execution_request_approval_handler
996                .read()
997                .await
998                .clone();
999            let Some(handler) = handler else {
1000                return false;
1001            };
1002
1003            let response = handler(params.clone()).await;
1004            send_server_request_handler_result(
1005                inner,
1006                id,
1007                response,
1008                "item/commandExecution/requestApproval",
1009            )
1010            .await
1011        }
1012        ServerRequestEvent::FileChangeRequestApproval { id, params } => {
1013            let handler = inner
1014                .file_change_request_approval_handler
1015                .read()
1016                .await
1017                .clone();
1018            let Some(handler) = handler else {
1019                return false;
1020            };
1021
1022            let response = handler(params.clone()).await;
1023            send_server_request_handler_result(
1024                inner,
1025                id,
1026                response,
1027                "item/fileChange/requestApproval",
1028            )
1029            .await
1030        }
1031        ServerRequestEvent::ToolRequestUserInput { id, params } => {
1032            let handler = inner.tool_request_user_input_handler.read().await.clone();
1033            let Some(handler) = handler else {
1034                return false;
1035            };
1036
1037            let response = handler(params.clone()).await;
1038            send_server_request_handler_result(inner, id, response, "item/tool/requestUserInput")
1039                .await
1040        }
1041        ServerRequestEvent::DynamicToolCall { id, params } => {
1042            let handler = inner.dynamic_tool_call_handler.read().await.clone();
1043            let Some(handler) = handler else {
1044                return false;
1045            };
1046
1047            let response = handler(params.clone()).await;
1048            send_server_request_handler_result(inner, id, response, "item/tool/call").await
1049        }
1050        _ => false,
1051    }
1052}
1053
1054async fn send_server_request_handler_result<R: Serialize>(
1055    inner: &Arc<Inner>,
1056    id: &RequestId,
1057    response: Result<R, ClientError>,
1058    context: &str,
1059) -> bool {
1060    let payload = match response {
1061        Ok(result) => json!({ "id": id, "result": result }),
1062        Err(err) => json!({
1063            "id": id,
1064            "error": {
1065                "code": -32001,
1066                "message": format!("{context} handler failed: {err}")
1067            }
1068        }),
1069    };
1070
1071    if inner.outbound.send(payload).await.is_err() {
1072        let _ = inner.event_tx.send(ServerEvent::TransportClosed);
1073    }
1074
1075    true
1076}
1077
1078async fn fail_all_pending(inner: &Arc<Inner>, message: &str) {
1079    let mut pending = inner.pending.lock().await;
1080    let entries = std::mem::take(&mut *pending);
1081    drop(pending);
1082
1083    for (_, sender) in entries {
1084        let _ = sender.send(Err(RpcError {
1085            code: -32098,
1086            message: message.to_string(),
1087            data: None,
1088        }));
1089    }
1090}
1091
1092#[cfg(test)]
1093mod tests {
1094    use super::*;
1095    use tokio::time::{Duration, timeout};
1096
1097    fn test_client() -> (
1098        CodexClient,
1099        mpsc::Sender<Result<Value, ClientError>>,
1100        mpsc::Receiver<Value>,
1101    ) {
1102        let (transport_outbound_tx, transport_outbound_rx) = mpsc::channel::<Value>(32);
1103        let (transport_inbound_tx, transport_inbound_rx) =
1104            mpsc::channel::<Result<Value, ClientError>>(32);
1105        let client = CodexClient::from_transport(
1106            TransportHandle {
1107                outbound: transport_outbound_tx,
1108                inbound: transport_inbound_rx,
1109            },
1110            Duration::from_secs(5),
1111        );
1112        (client, transport_inbound_tx, transport_outbound_rx)
1113    }
1114
1115    #[tokio::test]
1116    async fn auto_handles_apply_patch_approval_when_handler_registered() {
1117        let (client, inbound_tx, mut outbound_rx) = test_client();
1118
1119        client
1120            .set_apply_patch_approval_handler(|_| async {
1121                let mut response = server_requests::ApplyPatchApprovalResponse::default();
1122                response
1123                    .extra
1124                    .insert("decision".to_string(), Value::String("approve".to_string()));
1125                Ok(response)
1126            })
1127            .await;
1128
1129        inbound_tx
1130            .send(Ok(json!({
1131                "id": 42,
1132                "method": "applyPatchApproval",
1133                "params": {}
1134            })))
1135            .await
1136            .expect("send inbound server request");
1137
1138        let outbound = timeout(Duration::from_secs(2), outbound_rx.recv())
1139            .await
1140            .expect("timed out waiting for outbound response")
1141            .expect("expected outbound response frame");
1142        assert_eq!(outbound.get("id"), Some(&json!(42)));
1143        assert_eq!(
1144            outbound.pointer("/result/decision"),
1145            Some(&Value::String("approve".to_string()))
1146        );
1147    }
1148
1149    #[tokio::test]
1150    async fn unhandled_server_request_is_published_as_event() {
1151        let (client, inbound_tx, mut outbound_rx) = test_client();
1152
1153        inbound_tx
1154            .send(Ok(json!({
1155                "id": 7,
1156                "method": "applyPatchApproval",
1157                "params": {}
1158            })))
1159            .await
1160            .expect("send inbound server request");
1161
1162        let event = timeout(Duration::from_secs(2), client.next_event())
1163            .await
1164            .expect("timed out waiting for event")
1165            .expect("event receive");
1166
1167        match event {
1168            ServerEvent::ServerRequest(ServerRequestEvent::ApplyPatchApproval { id, .. }) => {
1169                assert_eq!(id, RequestId::Integer(7));
1170            }
1171            other => panic!("unexpected event: {other:?}"),
1172        }
1173
1174        assert!(
1175            timeout(Duration::from_millis(200), outbound_rx.recv())
1176                .await
1177                .is_err(),
1178            "did not expect auto-response when handler is absent"
1179        );
1180    }
1181}