Skip to main content

codex_mobile_bridge/app_server/
manager.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use serde_json::{Value, json};
5
6use super::handshake::{default_opt_out_notification_methods, parse_initialize_info};
7use super::{
8    APP_SERVER_EXPERIMENTAL_API_ENABLED, AppServerInbound, AppServerManager, RunningAppServer,
9};
10
11impl AppServerManager {
12    pub fn new(
13        launch_config: super::AppServerLaunchConfig,
14        inbound_tx: tokio::sync::mpsc::UnboundedSender<AppServerInbound>,
15    ) -> Self {
16        Self {
17            launch_config,
18            inbound_tx,
19            inner: tokio::sync::Mutex::new(None),
20        }
21    }
22
23    pub fn runtime_id(&self) -> &str {
24        &self.launch_config.runtime_id
25    }
26
27    pub async fn start(&self) -> Result<()> {
28        self.ensure_started().await.map(|_| ())
29    }
30
31    pub async fn stop(&self) -> Result<()> {
32        let existing = {
33            let mut guard = self.inner.lock().await;
34            guard.take()
35        };
36
37        if let Some(existing) = existing {
38            existing.stop().await?;
39            for _ in 0..30 {
40                if !existing.is_alive() {
41                    break;
42                }
43                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
44            }
45        }
46
47        Ok(())
48    }
49
50    pub async fn restart(&self) -> Result<()> {
51        self.stop().await?;
52        self.start().await
53    }
54
55    pub async fn request(&self, method: &str, params: Value) -> Result<Value> {
56        let running = self.ensure_started().await?;
57        running.request(method, params).await
58    }
59
60    pub async fn respond(&self, id: Value, result: Value) -> Result<()> {
61        let running = self.ensure_started().await?;
62        running.respond(id, result).await
63    }
64
65    pub async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
66        let running = self.ensure_started().await?;
67        running.respond_error(id, code, message).await
68    }
69
70    async fn ensure_started(&self) -> Result<Arc<RunningAppServer>> {
71        {
72            let guard = self.inner.lock().await;
73            if let Some(existing) = guard.as_ref().filter(|existing| existing.is_alive()) {
74                return Ok(Arc::clone(existing));
75            }
76        }
77
78        let opt_out_notification_methods = default_opt_out_notification_methods();
79        let _ = self.inbound_tx.send(AppServerInbound::Starting {
80            runtime_id: self.launch_config.runtime_id.clone(),
81        });
82        let running =
83            RunningAppServer::spawn(self.launch_config.clone(), self.inbound_tx.clone()).await?;
84        {
85            let mut guard = self.inner.lock().await;
86            *guard = Some(Arc::clone(&running));
87        }
88        let _ = self.inbound_tx.send(AppServerInbound::Initializing {
89            runtime_id: self.launch_config.runtime_id.clone(),
90            experimental_api_enabled: APP_SERVER_EXPERIMENTAL_API_ENABLED,
91            opt_out_notification_methods: opt_out_notification_methods.clone(),
92        });
93
94        let init_result = match running
95            .request(
96                "initialize",
97                json!({
98                    "clientInfo": {
99                        "name": "codex-mobile-bridge",
100                        "title": "Codex Mobile Bridge",
101                        "version": env!("CARGO_PKG_VERSION"),
102                    },
103                    "capabilities": {
104                        "experimentalApi": APP_SERVER_EXPERIMENTAL_API_ENABLED,
105                        "optOutNotificationMethods": opt_out_notification_methods,
106                    }
107                }),
108            )
109            .await
110        {
111            Ok(result) => result,
112            Err(error) => {
113                self.abort_startup(
114                    &running,
115                    APP_SERVER_EXPERIMENTAL_API_ENABLED,
116                    &default_opt_out_notification_methods(),
117                    format!("initialize 失败: {error}"),
118                )
119                .await;
120                return Err(error);
121            }
122        };
123
124        let info = match parse_initialize_info(&init_result) {
125            Ok(info) => info,
126            Err(error) => {
127                self.abort_startup(
128                    &running,
129                    APP_SERVER_EXPERIMENTAL_API_ENABLED,
130                    &default_opt_out_notification_methods(),
131                    format!("initialize 响应解析失败: {error}"),
132                )
133                .await;
134                return Err(error);
135            }
136        };
137        if let Err(error) = running.notify_initialized().await {
138            self.abort_startup(
139                &running,
140                APP_SERVER_EXPERIMENTAL_API_ENABLED,
141                &default_opt_out_notification_methods(),
142                format!("initialized 发送失败: {error}"),
143            )
144            .await;
145            return Err(error);
146        }
147        let _ = self.inbound_tx.send(AppServerInbound::Initialized {
148            runtime_id: self.launch_config.runtime_id.clone(),
149            info,
150            experimental_api_enabled: APP_SERVER_EXPERIMENTAL_API_ENABLED,
151            opt_out_notification_methods: default_opt_out_notification_methods(),
152        });
153        Ok(running)
154    }
155
156    async fn abort_startup(
157        &self,
158        running: &Arc<RunningAppServer>,
159        experimental_api_enabled: bool,
160        opt_out_notification_methods: &[String],
161        message: String,
162    ) {
163        let _ = self.inbound_tx.send(AppServerInbound::HandshakeFailed {
164            runtime_id: self.launch_config.runtime_id.clone(),
165            message,
166            experimental_api_enabled,
167            opt_out_notification_methods: opt_out_notification_methods.to_vec(),
168        });
169        let _ = running.abort().await;
170        let mut guard = self.inner.lock().await;
171        if guard
172            .as_ref()
173            .map(|existing| Arc::ptr_eq(existing, running))
174            .unwrap_or(false)
175        {
176            *guard = None;
177        }
178    }
179}