Skip to main content

codex_app_server_sdk/client/
mod.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
6use std::time::Duration;
7
8use serde::Serialize;
9use serde_json::{Value, json};
10use tokio::sync::{Mutex, RwLock, broadcast, mpsc, oneshot};
11
12use crate::api::{Codex, ResumeThread, Thread, ThreadOptions};
13use crate::error::{ClientError, IncomingClassified, RpcError, classify_incoming};
14use crate::events::{
15    ServerEvent, ServerNotification, ServerRequestEvent, parse_notification, parse_server_request,
16};
17use crate::protocol::requests;
18use crate::protocol::responses;
19use crate::protocol::server_requests;
20use crate::protocol::shared::{EmptyObject, RequestId};
21use crate::transport::TransportHandle;
22use crate::transport::stdio::spawn_stdio_transport;
23use crate::transport::ws::connect_ws_transport;
24use crate::transport::ws_daemon::ensure_local_ws_app_server;
25
26type PendingMap = HashMap<RequestId, oneshot::Sender<Result<Value, RpcError>>>;
27type RefreshFuture = Pin<
28    Box<
29        dyn Future<Output = Result<server_requests::ChatgptAuthTokensRefreshResponse, ClientError>>
30            + Send,
31    >,
32>;
33type RefreshHandler =
34    Arc<dyn Fn(server_requests::ChatgptAuthTokensRefreshParams) -> RefreshFuture + Send + Sync>;
35type ApplyPatchApprovalFuture = Pin<
36    Box<
37        dyn Future<Output = Result<server_requests::ApplyPatchApprovalResponse, ClientError>>
38            + Send,
39    >,
40>;
41type ApplyPatchApprovalHandler = Arc<
42    dyn Fn(server_requests::ApplyPatchApprovalParams) -> ApplyPatchApprovalFuture + Send + Sync,
43>;
44type ExecCommandApprovalFuture = Pin<
45    Box<
46        dyn Future<Output = Result<server_requests::ExecCommandApprovalResponse, ClientError>>
47            + Send,
48    >,
49>;
50type ExecCommandApprovalHandler = Arc<
51    dyn Fn(server_requests::ExecCommandApprovalParams) -> ExecCommandApprovalFuture + Send + Sync,
52>;
53type CommandExecutionRequestApprovalFuture = Pin<
54    Box<
55        dyn Future<
56                Output = Result<
57                    server_requests::CommandExecutionRequestApprovalResponse,
58                    ClientError,
59                >,
60            > + Send,
61    >,
62>;
63type CommandExecutionRequestApprovalHandler = Arc<
64    dyn Fn(
65            server_requests::CommandExecutionRequestApprovalParams,
66        ) -> CommandExecutionRequestApprovalFuture
67        + Send
68        + Sync,
69>;
70type FileChangeRequestApprovalFuture = Pin<
71    Box<
72        dyn Future<Output = Result<server_requests::FileChangeRequestApprovalResponse, ClientError>>
73            + Send,
74    >,
75>;
76type FileChangeRequestApprovalHandler = Arc<
77    dyn Fn(server_requests::FileChangeRequestApprovalParams) -> FileChangeRequestApprovalFuture
78        + Send
79        + Sync,
80>;
81type ToolRequestUserInputFuture = Pin<
82    Box<
83        dyn Future<Output = Result<server_requests::ToolRequestUserInputResponse, ClientError>>
84            + Send,
85    >,
86>;
87type ToolRequestUserInputHandler = Arc<
88    dyn Fn(server_requests::ToolRequestUserInputParams) -> ToolRequestUserInputFuture + Send + Sync,
89>;
90type DynamicToolCallFuture = Pin<
91    Box<dyn Future<Output = Result<server_requests::DynamicToolCallResponse, ClientError>> + Send>,
92>;
93type DynamicToolCallHandler =
94    Arc<dyn Fn(server_requests::DynamicToolCallParams) -> DynamicToolCallFuture + Send + Sync>;
95
96#[derive(Debug, Clone)]
97pub struct ClientOptions {
98    pub default_timeout: Duration,
99}
100
101impl Default for ClientOptions {
102    fn default() -> Self {
103        Self {
104            default_timeout: Duration::from_secs(30),
105        }
106    }
107}
108
109#[derive(Debug, Clone)]
110pub struct StdioConfig {
111    pub codex_binary: String,
112    pub args: Vec<String>,
113    pub env: HashMap<String, String>,
114    pub options: ClientOptions,
115}
116
117impl Default for StdioConfig {
118    fn default() -> Self {
119        Self {
120            codex_binary: "codex".to_string(),
121            args: vec!["app-server".to_string()],
122            env: HashMap::new(),
123            options: ClientOptions::default(),
124        }
125    }
126}
127
128#[derive(Debug, Clone)]
129pub struct WsConfig {
130    pub url: String,
131    pub env: HashMap<String, String>,
132    pub options: ClientOptions,
133}
134
135impl WsConfig {
136    pub fn new(
137        url: impl Into<String>,
138        env: HashMap<String, String>,
139        options: ClientOptions,
140    ) -> Self {
141        Self {
142            url: url.into(),
143            env,
144            options,
145        }
146    }
147
148    pub fn with_url(mut self, url: impl Into<String>) -> Self {
149        self.url = url.into();
150        self
151    }
152
153    pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
154        self.env = env;
155        self
156    }
157}
158
159impl Default for WsConfig {
160    fn default() -> Self {
161        Self {
162            url: String::from("ws://127.0.0.1:4222"),
163            env: HashMap::new(),
164            options: ClientOptions::default(),
165        }
166    }
167}
168
169struct Inner {
170    outbound: mpsc::Sender<Value>,
171    pending: Mutex<PendingMap>,
172    default_timeout: Duration,
173    initialized: AtomicBool,
174    ready: AtomicBool,
175    next_id: AtomicI64,
176    event_tx: broadcast::Sender<ServerEvent>,
177    event_rx: Mutex<broadcast::Receiver<ServerEvent>>,
178    refresh_handler: RwLock<Option<RefreshHandler>>,
179    apply_patch_approval_handler: RwLock<Option<ApplyPatchApprovalHandler>>,
180    exec_command_approval_handler: RwLock<Option<ExecCommandApprovalHandler>>,
181    command_execution_request_approval_handler:
182        RwLock<Option<CommandExecutionRequestApprovalHandler>>,
183    file_change_request_approval_handler: RwLock<Option<FileChangeRequestApprovalHandler>>,
184    tool_request_user_input_handler: RwLock<Option<ToolRequestUserInputHandler>>,
185    dynamic_tool_call_handler: RwLock<Option<DynamicToolCallHandler>>,
186}
187
188#[derive(Clone)]
189pub struct CodexClient {
190    inner: Arc<Inner>,
191}
192
193macro_rules! typed_method {
194    ($fn_name:ident, $method:literal, $params_ty:ty, $result_ty:ty) => {
195        pub async fn $fn_name(&self, params: $params_ty) -> Result<$result_ty, ClientError> {
196            self.request_typed_internal($method, params, None, true)
197                .await
198        }
199    };
200}
201
202macro_rules! typed_null_method {
203    ($fn_name:ident, $method:literal, $result_ty:ty) => {
204        pub async fn $fn_name(&self) -> Result<$result_ty, ClientError> {
205            self.request_typed_value_internal($method, Value::Null, None, true)
206                .await
207        }
208    };
209}
210
211impl CodexClient {
212    pub async fn spawn_stdio(config: StdioConfig) -> Result<Self, ClientError> {
213        let handle = spawn_stdio_transport(&config.codex_binary, &config.args, &config.env).await?;
214        Ok(Self::from_transport(handle, config.options.default_timeout))
215    }
216
217    pub async fn connect_ws(config: WsConfig) -> Result<Self, ClientError> {
218        let handle = connect_ws_transport(&config.url).await?;
219        Ok(Self::from_transport(handle, config.options.default_timeout))
220    }
221
222    pub async fn start_and_connect_ws(config: WsConfig) -> Result<Self, ClientError> {
223        ensure_local_ws_app_server(&config.url, &config.env).await?;
224
225        let handle = connect_ws_transport(&config.url).await?;
226        Ok(Self::from_transport(handle, config.options.default_timeout))
227    }
228
229    fn from_transport(handle: TransportHandle, default_timeout: Duration) -> Self {
230        let (event_tx, event_rx) = broadcast::channel(1024);
231        let inner = Arc::new(Inner {
232            outbound: handle.outbound,
233            pending: Mutex::new(HashMap::new()),
234            default_timeout,
235            initialized: AtomicBool::new(false),
236            ready: AtomicBool::new(false),
237            next_id: AtomicI64::new(1),
238            event_tx,
239            event_rx: Mutex::new(event_rx),
240            refresh_handler: RwLock::new(None),
241            apply_patch_approval_handler: RwLock::new(None),
242            exec_command_approval_handler: RwLock::new(None),
243            command_execution_request_approval_handler: RwLock::new(None),
244            file_change_request_approval_handler: RwLock::new(None),
245            tool_request_user_input_handler: RwLock::new(None),
246            dynamic_tool_call_handler: RwLock::new(None),
247        });
248
249        tokio::spawn(run_inbound_loop(handle.inbound, inner.clone()));
250        Self { inner }
251    }
252
253    pub fn as_api(&self) -> Codex {
254        Codex::from_client(self.clone())
255    }
256
257    pub fn start_thread(&self, options: ThreadOptions) -> Thread {
258        self.as_api().start_thread(options)
259    }
260
261    pub fn resume_thread(&self, target: impl Into<ResumeThread>, options: ThreadOptions) -> Thread {
262        self.as_api().resume_thread(target, options)
263    }
264
265    pub fn resume_thread_by_id(&self, id: impl Into<String>, options: ThreadOptions) -> Thread {
266        self.as_api().resume_thread_by_id(id, options)
267    }
268
269    pub fn resume_latest_thread(&self, options: ThreadOptions) -> Thread {
270        self.as_api().resume_latest_thread(options)
271    }
272
273    pub fn subscribe(&self) -> broadcast::Receiver<ServerEvent> {
274        self.inner.event_tx.subscribe()
275    }
276
277    pub async fn next_event(&self) -> Result<ServerEvent, ClientError> {
278        let mut rx = self.inner.event_rx.lock().await;
279        rx.recv().await.map_err(|err| {
280            ClientError::TransportSend(format!("event channel receive failed: {err}"))
281        })
282    }
283
284    pub async fn set_chatgpt_auth_tokens_refresh_handler<F, Fut>(&self, handler: F)
285    where
286        F: Fn(server_requests::ChatgptAuthTokensRefreshParams) -> Fut + Send + Sync + 'static,
287        Fut: Future<Output = Result<server_requests::ChatgptAuthTokensRefreshResponse, ClientError>>
288            + Send
289            + 'static,
290    {
291        let wrapped: RefreshHandler = Arc::new(move |params| Box::pin(handler(params)));
292        *self.inner.refresh_handler.write().await = Some(wrapped);
293    }
294
295    pub async fn clear_chatgpt_auth_tokens_refresh_handler(&self) {
296        *self.inner.refresh_handler.write().await = None;
297    }
298
299    pub async fn set_apply_patch_approval_handler<F, Fut>(&self, handler: F)
300    where
301        F: Fn(server_requests::ApplyPatchApprovalParams) -> Fut + Send + Sync + 'static,
302        Fut: Future<Output = Result<server_requests::ApplyPatchApprovalResponse, ClientError>>
303            + Send
304            + 'static,
305    {
306        let wrapped: ApplyPatchApprovalHandler = Arc::new(move |params| Box::pin(handler(params)));
307        *self.inner.apply_patch_approval_handler.write().await = Some(wrapped);
308    }
309
310    pub async fn clear_apply_patch_approval_handler(&self) {
311        *self.inner.apply_patch_approval_handler.write().await = None;
312    }
313
314    pub async fn set_exec_command_approval_handler<F, Fut>(&self, handler: F)
315    where
316        F: Fn(server_requests::ExecCommandApprovalParams) -> Fut + Send + Sync + 'static,
317        Fut: Future<Output = Result<server_requests::ExecCommandApprovalResponse, ClientError>>
318            + Send
319            + 'static,
320    {
321        let wrapped: ExecCommandApprovalHandler = Arc::new(move |params| Box::pin(handler(params)));
322        *self.inner.exec_command_approval_handler.write().await = Some(wrapped);
323    }
324
325    pub async fn clear_exec_command_approval_handler(&self) {
326        *self.inner.exec_command_approval_handler.write().await = None;
327    }
328
329    pub async fn set_command_execution_request_approval_handler<F, Fut>(&self, handler: F)
330    where
331        F: Fn(server_requests::CommandExecutionRequestApprovalParams) -> Fut
332            + Send
333            + Sync
334            + 'static,
335        Fut: Future<
336                Output = Result<
337                    server_requests::CommandExecutionRequestApprovalResponse,
338                    ClientError,
339                >,
340            > + Send
341            + 'static,
342    {
343        let wrapped: CommandExecutionRequestApprovalHandler =
344            Arc::new(move |params| Box::pin(handler(params)));
345        *self
346            .inner
347            .command_execution_request_approval_handler
348            .write()
349            .await = Some(wrapped);
350    }
351
352    pub async fn clear_command_execution_request_approval_handler(&self) {
353        *self
354            .inner
355            .command_execution_request_approval_handler
356            .write()
357            .await = None;
358    }
359
360    pub async fn set_file_change_request_approval_handler<F, Fut>(&self, handler: F)
361    where
362        F: Fn(server_requests::FileChangeRequestApprovalParams) -> Fut + Send + Sync + 'static,
363        Fut: Future<Output = Result<server_requests::FileChangeRequestApprovalResponse, ClientError>>
364            + Send
365            + 'static,
366    {
367        let wrapped: FileChangeRequestApprovalHandler =
368            Arc::new(move |params| Box::pin(handler(params)));
369        *self
370            .inner
371            .file_change_request_approval_handler
372            .write()
373            .await = Some(wrapped);
374    }
375
376    pub async fn clear_file_change_request_approval_handler(&self) {
377        *self
378            .inner
379            .file_change_request_approval_handler
380            .write()
381            .await = None;
382    }
383
384    pub async fn set_tool_request_user_input_handler<F, Fut>(&self, handler: F)
385    where
386        F: Fn(server_requests::ToolRequestUserInputParams) -> Fut + Send + Sync + 'static,
387        Fut: Future<Output = Result<server_requests::ToolRequestUserInputResponse, ClientError>>
388            + Send
389            + 'static,
390    {
391        let wrapped: ToolRequestUserInputHandler =
392            Arc::new(move |params| Box::pin(handler(params)));
393        *self.inner.tool_request_user_input_handler.write().await = Some(wrapped);
394    }
395
396    pub async fn clear_tool_request_user_input_handler(&self) {
397        *self.inner.tool_request_user_input_handler.write().await = None;
398    }
399
400    pub async fn set_dynamic_tool_call_handler<F, Fut>(&self, handler: F)
401    where
402        F: Fn(server_requests::DynamicToolCallParams) -> Fut + Send + Sync + 'static,
403        Fut: Future<Output = Result<server_requests::DynamicToolCallResponse, ClientError>>
404            + Send
405            + 'static,
406    {
407        let wrapped: DynamicToolCallHandler = Arc::new(move |params| Box::pin(handler(params)));
408        *self.inner.dynamic_tool_call_handler.write().await = Some(wrapped);
409    }
410
411    pub async fn clear_dynamic_tool_call_handler(&self) {
412        *self.inner.dynamic_tool_call_handler.write().await = None;
413    }
414
415    pub async fn initialize(
416        &self,
417        params: requests::InitializeParams,
418    ) -> Result<responses::InitializeResult, ClientError> {
419        if self.inner.initialized.load(Ordering::SeqCst) {
420            return Err(ClientError::AlreadyInitialized);
421        }
422
423        let result: responses::InitializeResult = self
424            .request_typed_internal("initialize", params, None, false)
425            .await?;
426
427        self.inner.initialized.store(true, Ordering::SeqCst);
428        Ok(result)
429    }
430
431    pub async fn initialized(&self) -> Result<(), ClientError> {
432        if !self.inner.initialized.load(Ordering::SeqCst) {
433            return Err(ClientError::NotInitialized {
434                method: "initialized".to_string(),
435            });
436        }
437        self.send_notification("initialized", EmptyObject::default(), false)
438            .await?;
439        self.inner.ready.store(true, Ordering::SeqCst);
440        Ok(())
441    }
442
443    pub async fn send_raw_request(
444        &self,
445        method: impl Into<String>,
446        params: Value,
447        timeout: Option<Duration>,
448    ) -> Result<Value, ClientError> {
449        let method = method.into();
450        let requires_ready = method != "initialize";
451        self.request_value_internal(&method, params, timeout, requires_ready)
452            .await
453    }
454
455    pub async fn send_raw_notification(
456        &self,
457        method: impl Into<String>,
458        params: Value,
459    ) -> Result<(), ClientError> {
460        let method = method.into();
461        let requires_ready = method != "initialized";
462        self.send_notification(&method, params, requires_ready)
463            .await
464    }
465
466    pub async fn respond_server_request<R: Serialize>(
467        &self,
468        id: RequestId,
469        result: R,
470    ) -> Result<(), ClientError> {
471        let result = serde_json::to_value(result)?;
472        self.send_message(json!({ "id": id, "result": result }))
473            .await
474    }
475
476    pub async fn respond_server_request_error(
477        &self,
478        id: RequestId,
479        error: RpcError,
480    ) -> Result<(), ClientError> {
481        self.send_message(json!({ "id": id, "error": error })).await
482    }
483
484    pub async fn respond_chatgpt_auth_tokens_refresh(
485        &self,
486        id: RequestId,
487        response: server_requests::ChatgptAuthTokensRefreshResponse,
488    ) -> Result<(), ClientError> {
489        self.respond_server_request(id, response).await
490    }
491
492    pub async fn respond_apply_patch_approval(
493        &self,
494        id: RequestId,
495        response: server_requests::ApplyPatchApprovalResponse,
496    ) -> Result<(), ClientError> {
497        self.respond_server_request(id, response).await
498    }
499
500    pub async fn respond_exec_command_approval(
501        &self,
502        id: RequestId,
503        response: server_requests::ExecCommandApprovalResponse,
504    ) -> Result<(), ClientError> {
505        self.respond_server_request(id, response).await
506    }
507
508    pub async fn respond_command_execution_request_approval(
509        &self,
510        id: RequestId,
511        response: server_requests::CommandExecutionRequestApprovalResponse,
512    ) -> Result<(), ClientError> {
513        self.respond_server_request(id, response).await
514    }
515
516    pub async fn respond_file_change_request_approval(
517        &self,
518        id: RequestId,
519        response: server_requests::FileChangeRequestApprovalResponse,
520    ) -> Result<(), ClientError> {
521        self.respond_server_request(id, response).await
522    }
523
524    pub async fn respond_tool_request_user_input(
525        &self,
526        id: RequestId,
527        response: server_requests::ToolRequestUserInputResponse,
528    ) -> Result<(), ClientError> {
529        self.respond_server_request(id, response).await
530    }
531
532    pub async fn respond_dynamic_tool_call(
533        &self,
534        id: RequestId,
535        response: server_requests::DynamicToolCallResponse,
536    ) -> Result<(), ClientError> {
537        self.respond_server_request(id, response).await
538    }
539
540    typed_method!(
541        thread_start,
542        "thread/start",
543        requests::ThreadStartParams,
544        responses::ThreadResult
545    );
546    typed_method!(
547        thread_resume,
548        "thread/resume",
549        requests::ThreadResumeParams,
550        responses::ThreadResult
551    );
552    typed_method!(
553        thread_fork,
554        "thread/fork",
555        requests::ThreadForkParams,
556        responses::ThreadResult
557    );
558    typed_method!(
559        thread_archive,
560        "thread/archive",
561        requests::ThreadArchiveParams,
562        responses::ThreadArchiveResult
563    );
564    typed_method!(
565        thread_name_set,
566        "thread/name/set",
567        requests::ThreadSetNameParams,
568        responses::ThreadSetNameResult
569    );
570    typed_method!(
571        thread_unarchive,
572        "thread/unarchive",
573        requests::ThreadUnarchiveParams,
574        responses::ThreadUnarchiveResult
575    );
576    typed_method!(
577        thread_compact_start,
578        "thread/compact/start",
579        requests::ThreadCompactStartParams,
580        responses::ThreadCompactStartResult
581    );
582    typed_method!(
583        thread_background_terminals_clean,
584        "thread/backgroundTerminals/clean",
585        requests::ThreadBackgroundTerminalsCleanParams,
586        responses::ThreadBackgroundTerminalsCleanResult
587    );
588    typed_method!(
589        thread_rollback,
590        "thread/rollback",
591        requests::ThreadRollbackParams,
592        responses::ThreadRollbackResult
593    );
594    typed_method!(
595        thread_list,
596        "thread/list",
597        requests::ThreadListParams,
598        responses::ThreadListResult
599    );
600    typed_method!(
601        thread_loaded_list,
602        "thread/loaded/list",
603        requests::ThreadLoadedListParams,
604        responses::ThreadLoadedListResult
605    );
606    typed_method!(
607        thread_read,
608        "thread/read",
609        requests::ThreadReadParams,
610        responses::ThreadReadResult
611    );
612    typed_method!(
613        skills_list,
614        "skills/list",
615        requests::SkillsListParams,
616        responses::SkillsListResult
617    );
618    typed_method!(
619        skills_remote_list,
620        "skills/remote/list",
621        requests::SkillsRemoteReadParams,
622        responses::SkillsRemoteReadResult
623    );
624    typed_method!(
625        skills_remote_export,
626        "skills/remote/export",
627        requests::SkillsRemoteWriteParams,
628        responses::SkillsRemoteWriteResult
629    );
630    typed_method!(
631        app_list,
632        "app/list",
633        requests::AppsListParams,
634        responses::AppsListResult
635    );
636    typed_method!(
637        skills_config_write,
638        "skills/config/write",
639        requests::SkillsConfigWriteParams,
640        responses::SkillsConfigWriteResult
641    );
642    typed_method!(
643        turn_start,
644        "turn/start",
645        requests::TurnStartParams,
646        responses::TurnResult
647    );
648    typed_method!(
649        turn_steer,
650        "turn/steer",
651        requests::TurnSteerParams,
652        responses::TurnSteerResult
653    );
654    typed_method!(
655        turn_interrupt,
656        "turn/interrupt",
657        requests::TurnInterruptParams,
658        EmptyObject
659    );
660    typed_method!(
661        review_start,
662        "review/start",
663        requests::ReviewStartParams,
664        responses::ReviewStartResult
665    );
666    typed_method!(
667        model_list,
668        "model/list",
669        requests::ModelListParams,
670        responses::ModelListResult
671    );
672    typed_method!(
673        experimental_feature_list,
674        "experimentalFeature/list",
675        requests::ExperimentalFeatureListParams,
676        responses::ExperimentalFeatureListResult
677    );
678    typed_method!(
679        collaboration_mode_list,
680        "collaborationMode/list",
681        requests::CollaborationModeListParams,
682        responses::CollaborationModeListResult
683    );
684    typed_method!(
685        mock_experimental_method,
686        "mock/experimentalMethod",
687        requests::MockExperimentalMethodParams,
688        responses::MockExperimentalMethodResult
689    );
690    typed_method!(
691        mcp_server_oauth_login,
692        "mcpServer/oauth/login",
693        requests::McpServerOauthLoginParams,
694        responses::McpServerOauthLoginResult
695    );
696    typed_method!(
697        mcp_server_status_list,
698        "mcpServerStatus/list",
699        requests::ListMcpServerStatusParams,
700        responses::McpServerStatusListResult
701    );
702    typed_method!(
703        windows_sandbox_setup_start,
704        "windowsSandbox/setupStart",
705        requests::WindowsSandboxSetupStartParams,
706        responses::WindowsSandboxSetupStartResult
707    );
708    typed_method!(
709        account_login_start,
710        "account/login/start",
711        requests::LoginAccountParams,
712        responses::LoginAccountResult
713    );
714    typed_method!(
715        account_login_cancel,
716        "account/login/cancel",
717        requests::CancelLoginAccountParams,
718        EmptyObject
719    );
720    typed_method!(
721        feedback_upload,
722        "feedback/upload",
723        requests::FeedbackUploadParams,
724        responses::FeedbackUploadResult
725    );
726    typed_method!(
727        command_exec,
728        "command/exec",
729        requests::CommandExecParams,
730        responses::CommandExecResult
731    );
732    typed_method!(
733        config_read,
734        "config/read",
735        requests::ConfigReadParams,
736        responses::ConfigReadResult
737    );
738    typed_method!(
739        config_value_write,
740        "config/value/write",
741        requests::ConfigValueWriteParams,
742        responses::ConfigValueWriteResult
743    );
744    typed_method!(
745        config_batch_write,
746        "config/batchWrite",
747        requests::ConfigBatchWriteParams,
748        responses::ConfigBatchWriteResult
749    );
750    typed_method!(
751        account_read,
752        "account/read",
753        requests::GetAccountParams,
754        responses::GetAccountResult
755    );
756    typed_method!(
757        fuzzy_file_search_session_start,
758        "fuzzyFileSearch/sessionStart",
759        requests::FuzzyFileSearchSessionStartParams,
760        responses::FuzzyFileSearchSessionStartResult
761    );
762    typed_method!(
763        fuzzy_file_search_session_update,
764        "fuzzyFileSearch/sessionUpdate",
765        requests::FuzzyFileSearchSessionUpdateParams,
766        responses::FuzzyFileSearchSessionUpdateResult
767    );
768    typed_method!(
769        fuzzy_file_search_session_stop,
770        "fuzzyFileSearch/sessionStop",
771        requests::FuzzyFileSearchSessionStopParams,
772        responses::FuzzyFileSearchSessionStopResult
773    );
774
775    // Backward-compatible aliases for previous method names.
776    pub async fn skills_remote_read(
777        &self,
778        params: requests::SkillsRemoteReadParams,
779    ) -> Result<responses::SkillsRemoteReadResult, ClientError> {
780        self.skills_remote_list(params).await
781    }
782
783    pub async fn skills_remote_write(
784        &self,
785        params: requests::SkillsRemoteWriteParams,
786    ) -> Result<responses::SkillsRemoteWriteResult, ClientError> {
787        self.skills_remote_export(params).await
788    }
789
790    typed_null_method!(
791        config_mcp_server_reload,
792        "config/mcpServer/reload",
793        EmptyObject
794    );
795    typed_null_method!(account_logout, "account/logout", EmptyObject);
796    typed_null_method!(
797        account_rate_limits_read,
798        "account/rateLimits/read",
799        responses::AccountRateLimitsReadResult
800    );
801    typed_null_method!(
802        config_requirements_read,
803        "configRequirements/read",
804        responses::ConfigRequirementsReadResult
805    );
806
807    async fn send_notification<P: Serialize>(
808        &self,
809        method: &str,
810        params: P,
811        requires_ready: bool,
812    ) -> Result<(), ClientError> {
813        if requires_ready && !self.inner.ready.load(Ordering::SeqCst) {
814            return Err(ClientError::NotReady {
815                method: method.to_string(),
816            });
817        }
818
819        let value = serde_json::to_value(params)?;
820        self.send_message(json!({ "method": method, "params": value }))
821            .await
822    }
823
824    async fn request_typed_internal<P, R>(
825        &self,
826        method: &str,
827        params: P,
828        timeout: Option<Duration>,
829        requires_ready: bool,
830    ) -> Result<R, ClientError>
831    where
832        P: Serialize,
833        R: serde::de::DeserializeOwned,
834    {
835        let value = serde_json::to_value(params)?;
836        self.request_typed_value_internal(method, value, timeout, requires_ready)
837            .await
838    }
839
840    async fn request_typed_value_internal<R>(
841        &self,
842        method: &str,
843        params: Value,
844        timeout: Option<Duration>,
845        requires_ready: bool,
846    ) -> Result<R, ClientError>
847    where
848        R: serde::de::DeserializeOwned,
849    {
850        let raw = self
851            .request_value_internal(method, params, timeout, requires_ready)
852            .await?;
853
854        serde_json::from_value(raw).map_err(|source| ClientError::UnexpectedResult {
855            method: method.to_string(),
856            source,
857        })
858    }
859
860    async fn request_value_internal(
861        &self,
862        method: &str,
863        params: Value,
864        timeout: Option<Duration>,
865        requires_ready: bool,
866    ) -> Result<Value, ClientError> {
867        if requires_ready && !self.inner.ready.load(Ordering::SeqCst) {
868            return Err(ClientError::NotReady {
869                method: method.to_string(),
870            });
871        }
872
873        if method == "initialize" && self.inner.initialized.load(Ordering::SeqCst) {
874            return Err(ClientError::AlreadyInitialized);
875        }
876
877        let id_num = self.inner.next_id.fetch_add(1, Ordering::SeqCst);
878        let id = RequestId::Integer(id_num);
879
880        let request = json!({
881            "method": method,
882            "id": id,
883            "params": params,
884        });
885
886        let (tx, rx) = oneshot::channel();
887        self.inner.pending.lock().await.insert(id.clone(), tx);
888
889        if let Err(err) = self.send_message(request).await {
890            self.inner.pending.lock().await.remove(&id);
891            return Err(err);
892        }
893
894        let timeout = timeout.unwrap_or(self.inner.default_timeout);
895        match tokio::time::timeout(timeout, rx).await {
896            Ok(Ok(Ok(value))) => Ok(value),
897            Ok(Ok(Err(error))) => Err(ClientError::Rpc { error }),
898            Ok(Err(_)) => Err(ClientError::TransportClosed),
899            Err(_) => {
900                self.inner.pending.lock().await.remove(&id);
901                Err(ClientError::Timeout {
902                    method: method.to_string(),
903                    timeout_ms: timeout.as_millis() as u64,
904                })
905            }
906        }
907    }
908
909    async fn send_message(&self, value: Value) -> Result<(), ClientError> {
910        self.inner.outbound.send(value).await.map_err(|err| {
911            ClientError::TransportSend(format!("failed to send outbound frame: {err}"))
912        })
913    }
914}
915
916async fn run_inbound_loop(
917    mut inbound: mpsc::Receiver<Result<Value, ClientError>>,
918    inner: Arc<Inner>,
919) {
920    while let Some(frame) = inbound.recv().await {
921        match frame {
922            Ok(value) => {
923                if let Err(err) = process_incoming_value(value, &inner).await {
924                    fail_all_pending(&inner, &format!("processing inbound frame failed: {err}"))
925                        .await;
926                    let _ = inner.event_tx.send(ServerEvent::TransportClosed);
927                    break;
928                }
929            }
930            Err(err) => {
931                fail_all_pending(&inner, &format!("transport error: {err}")).await;
932                let _ = inner.event_tx.send(ServerEvent::TransportClosed);
933                break;
934            }
935        }
936    }
937}
938
939async fn process_incoming_value(value: Value, inner: &Arc<Inner>) -> Result<(), ClientError> {
940    match classify_incoming(value)? {
941        IncomingClassified::Response { id, result } => {
942            if let Some(sender) = inner.pending.lock().await.remove(&id) {
943                let _ = sender.send(result);
944            }
945        }
946        IncomingClassified::Notification {
947            method,
948            params,
949            raw: _,
950        } => {
951            let parsed = parse_notification(method.clone(), params.clone())
952                .unwrap_or(ServerNotification::Unknown { method, params });
953            let _ = inner.event_tx.send(ServerEvent::Notification(parsed));
954        }
955        IncomingClassified::ServerRequest {
956            id,
957            method,
958            params,
959            raw: _,
960        } => {
961            let parsed = parse_server_request(id.clone(), method.clone(), params.clone())
962                .unwrap_or(ServerRequestEvent::Unknown { id, method, params });
963            if !try_auto_handle_server_request(inner, &parsed).await {
964                let _ = inner.event_tx.send(ServerEvent::ServerRequest(parsed));
965            }
966        }
967    }
968    Ok(())
969}
970
971async fn try_auto_handle_server_request(inner: &Arc<Inner>, request: &ServerRequestEvent) -> bool {
972    match request {
973        ServerRequestEvent::ChatgptAuthTokensRefresh { id, params } => {
974            let handler = inner.refresh_handler.read().await.clone();
975            let Some(handler) = handler else {
976                return false;
977            };
978
979            let response = handler(params.clone()).await;
980            send_server_request_handler_result(inner, id, response, "chatgptAuthTokens refresh")
981                .await
982        }
983        ServerRequestEvent::ApplyPatchApproval { id, params } => {
984            let handler = inner.apply_patch_approval_handler.read().await.clone();
985            let Some(handler) = handler else {
986                return false;
987            };
988
989            let response = handler(params.clone()).await;
990            send_server_request_handler_result(inner, id, response, "applyPatchApproval").await
991        }
992        ServerRequestEvent::ExecCommandApproval { id, params } => {
993            let handler = inner.exec_command_approval_handler.read().await.clone();
994            let Some(handler) = handler else {
995                return false;
996            };
997
998            let response = handler(params.clone()).await;
999            send_server_request_handler_result(inner, id, response, "execCommandApproval").await
1000        }
1001        ServerRequestEvent::CommandExecutionRequestApproval { id, params } => {
1002            let handler = inner
1003                .command_execution_request_approval_handler
1004                .read()
1005                .await
1006                .clone();
1007            let Some(handler) = handler else {
1008                return false;
1009            };
1010
1011            let response = handler(params.clone()).await;
1012            send_server_request_handler_result(
1013                inner,
1014                id,
1015                response,
1016                "item/commandExecution/requestApproval",
1017            )
1018            .await
1019        }
1020        ServerRequestEvent::FileChangeRequestApproval { id, params } => {
1021            let handler = inner
1022                .file_change_request_approval_handler
1023                .read()
1024                .await
1025                .clone();
1026            let Some(handler) = handler else {
1027                return false;
1028            };
1029
1030            let response = handler(params.clone()).await;
1031            send_server_request_handler_result(
1032                inner,
1033                id,
1034                response,
1035                "item/fileChange/requestApproval",
1036            )
1037            .await
1038        }
1039        ServerRequestEvent::ToolRequestUserInput { id, params } => {
1040            let handler = inner.tool_request_user_input_handler.read().await.clone();
1041            let Some(handler) = handler else {
1042                return false;
1043            };
1044
1045            let response = handler(params.clone()).await;
1046            send_server_request_handler_result(inner, id, response, "item/tool/requestUserInput")
1047                .await
1048        }
1049        ServerRequestEvent::DynamicToolCall { id, params } => {
1050            let handler = inner.dynamic_tool_call_handler.read().await.clone();
1051            let Some(handler) = handler else {
1052                return false;
1053            };
1054
1055            let response = handler(params.clone()).await;
1056            send_server_request_handler_result(inner, id, response, "item/tool/call").await
1057        }
1058        _ => false,
1059    }
1060}
1061
1062async fn send_server_request_handler_result<R: Serialize>(
1063    inner: &Arc<Inner>,
1064    id: &RequestId,
1065    response: Result<R, ClientError>,
1066    context: &str,
1067) -> bool {
1068    let payload = match response {
1069        Ok(result) => json!({ "id": id, "result": result }),
1070        Err(err) => json!({
1071            "id": id,
1072            "error": {
1073                "code": -32001,
1074                "message": format!("{context} handler failed: {err}")
1075            }
1076        }),
1077    };
1078
1079    if inner.outbound.send(payload).await.is_err() {
1080        let _ = inner.event_tx.send(ServerEvent::TransportClosed);
1081    }
1082
1083    true
1084}
1085
1086async fn fail_all_pending(inner: &Arc<Inner>, message: &str) {
1087    let mut pending = inner.pending.lock().await;
1088    let entries = std::mem::take(&mut *pending);
1089    drop(pending);
1090
1091    for (_, sender) in entries {
1092        let _ = sender.send(Err(RpcError {
1093            code: -32098,
1094            message: message.to_string(),
1095            data: None,
1096        }));
1097    }
1098}
1099
1100#[cfg(test)]
1101mod tests {
1102    use super::*;
1103    use tokio::time::{Duration, timeout};
1104
1105    fn test_client() -> (
1106        CodexClient,
1107        mpsc::Sender<Result<Value, ClientError>>,
1108        mpsc::Receiver<Value>,
1109    ) {
1110        let (transport_outbound_tx, transport_outbound_rx) = mpsc::channel::<Value>(32);
1111        let (transport_inbound_tx, transport_inbound_rx) =
1112            mpsc::channel::<Result<Value, ClientError>>(32);
1113        let client = CodexClient::from_transport(
1114            TransportHandle {
1115                outbound: transport_outbound_tx,
1116                inbound: transport_inbound_rx,
1117            },
1118            Duration::from_secs(5),
1119        );
1120        (client, transport_inbound_tx, transport_outbound_rx)
1121    }
1122
1123    #[tokio::test]
1124    async fn auto_handles_apply_patch_approval_when_handler_registered() {
1125        let (client, inbound_tx, mut outbound_rx) = test_client();
1126
1127        client
1128            .set_apply_patch_approval_handler(|_| async {
1129                let mut response = server_requests::ApplyPatchApprovalResponse::default();
1130                response
1131                    .extra
1132                    .insert("decision".to_string(), Value::String("approve".to_string()));
1133                Ok(response)
1134            })
1135            .await;
1136
1137        inbound_tx
1138            .send(Ok(json!({
1139                "id": 42,
1140                "method": "applyPatchApproval",
1141                "params": {}
1142            })))
1143            .await
1144            .expect("send inbound server request");
1145
1146        let outbound = timeout(Duration::from_secs(2), outbound_rx.recv())
1147            .await
1148            .expect("timed out waiting for outbound response")
1149            .expect("expected outbound response frame");
1150        assert_eq!(outbound.get("id"), Some(&json!(42)));
1151        assert_eq!(
1152            outbound.pointer("/result/decision"),
1153            Some(&Value::String("approve".to_string()))
1154        );
1155    }
1156
1157    #[tokio::test]
1158    async fn unhandled_server_request_is_published_as_event() {
1159        let (client, inbound_tx, mut outbound_rx) = test_client();
1160
1161        inbound_tx
1162            .send(Ok(json!({
1163                "id": 7,
1164                "method": "applyPatchApproval",
1165                "params": {}
1166            })))
1167            .await
1168            .expect("send inbound server request");
1169
1170        let event = timeout(Duration::from_secs(2), client.next_event())
1171            .await
1172            .expect("timed out waiting for event")
1173            .expect("event receive");
1174
1175        match event {
1176            ServerEvent::ServerRequest(ServerRequestEvent::ApplyPatchApproval { id, .. }) => {
1177                assert_eq!(id, RequestId::Integer(7));
1178            }
1179            other => panic!("unexpected event: {other:?}"),
1180        }
1181
1182        assert!(
1183            timeout(Duration::from_millis(200), outbound_rx.recv())
1184                .await
1185                .is_err(),
1186            "did not expect auto-response when handler is absent"
1187        );
1188    }
1189}