codex_mobile_bridge/app_server/
manager.rs1use std::sync::Arc;
2
3use anyhow::Result;
4use serde_json::{Value, json};
5
6use super::handshake::{default_opt_out_notification_methods, parse_initialize_info};
7use super::{
8 APP_SERVER_EXPERIMENTAL_API_ENABLED, AppServerInbound, AppServerManager, RunningAppServer,
9};
10
11impl AppServerManager {
12 pub fn new(
13 launch_config: super::AppServerLaunchConfig,
14 inbound_tx: tokio::sync::mpsc::UnboundedSender<AppServerInbound>,
15 ) -> Self {
16 Self {
17 launch_config,
18 inbound_tx,
19 inner: tokio::sync::Mutex::new(None),
20 }
21 }
22
23 pub fn runtime_id(&self) -> &str {
24 &self.launch_config.runtime_id
25 }
26
27 pub async fn start(&self) -> Result<()> {
28 self.ensure_started().await.map(|_| ())
29 }
30
31 pub async fn stop(&self) -> Result<()> {
32 let existing = {
33 let mut guard = self.inner.lock().await;
34 guard.take()
35 };
36
37 if let Some(existing) = existing {
38 existing.stop().await?;
39 for _ in 0..30 {
40 if !existing.is_alive() {
41 break;
42 }
43 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
44 }
45 }
46
47 Ok(())
48 }
49
50 pub async fn restart(&self) -> Result<()> {
51 self.stop().await?;
52 self.start().await
53 }
54
55 pub async fn request(&self, method: &str, params: Value) -> Result<Value> {
56 let running = self.ensure_started().await?;
57 running.request(method, params).await
58 }
59
60 pub async fn respond(&self, id: Value, result: Value) -> Result<()> {
61 let running = self.ensure_started().await?;
62 running.respond(id, result).await
63 }
64
65 pub async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
66 let running = self.ensure_started().await?;
67 running.respond_error(id, code, message).await
68 }
69
70 async fn ensure_started(&self) -> Result<Arc<RunningAppServer>> {
71 {
72 let guard = self.inner.lock().await;
73 if let Some(existing) = guard.as_ref().filter(|existing| existing.is_alive()) {
74 return Ok(Arc::clone(existing));
75 }
76 }
77
78 let opt_out_notification_methods = default_opt_out_notification_methods();
79 let _ = self.inbound_tx.send(AppServerInbound::Starting {
80 runtime_id: self.launch_config.runtime_id.clone(),
81 });
82 let running =
83 RunningAppServer::spawn(self.launch_config.clone(), self.inbound_tx.clone()).await?;
84 {
85 let mut guard = self.inner.lock().await;
86 *guard = Some(Arc::clone(&running));
87 }
88 let _ = self.inbound_tx.send(AppServerInbound::Initializing {
89 runtime_id: self.launch_config.runtime_id.clone(),
90 experimental_api_enabled: APP_SERVER_EXPERIMENTAL_API_ENABLED,
91 opt_out_notification_methods: opt_out_notification_methods.clone(),
92 });
93
94 let init_result = match running
95 .request(
96 "initialize",
97 json!({
98 "clientInfo": {
99 "name": "codex-mobile-bridge",
100 "title": "Codex Mobile Bridge",
101 "version": env!("CARGO_PKG_VERSION"),
102 },
103 "capabilities": {
104 "experimentalApi": APP_SERVER_EXPERIMENTAL_API_ENABLED,
105 "optOutNotificationMethods": opt_out_notification_methods,
106 }
107 }),
108 )
109 .await
110 {
111 Ok(result) => result,
112 Err(error) => {
113 self.abort_startup(
114 &running,
115 APP_SERVER_EXPERIMENTAL_API_ENABLED,
116 &default_opt_out_notification_methods(),
117 format!("initialize 失败: {error}"),
118 )
119 .await;
120 return Err(error);
121 }
122 };
123
124 let info = match parse_initialize_info(&init_result) {
125 Ok(info) => info,
126 Err(error) => {
127 self.abort_startup(
128 &running,
129 APP_SERVER_EXPERIMENTAL_API_ENABLED,
130 &default_opt_out_notification_methods(),
131 format!("initialize 响应解析失败: {error}"),
132 )
133 .await;
134 return Err(error);
135 }
136 };
137 if let Err(error) = running.notify_initialized().await {
138 self.abort_startup(
139 &running,
140 APP_SERVER_EXPERIMENTAL_API_ENABLED,
141 &default_opt_out_notification_methods(),
142 format!("initialized 发送失败: {error}"),
143 )
144 .await;
145 return Err(error);
146 }
147 let _ = self.inbound_tx.send(AppServerInbound::Initialized {
148 runtime_id: self.launch_config.runtime_id.clone(),
149 info,
150 experimental_api_enabled: APP_SERVER_EXPERIMENTAL_API_ENABLED,
151 opt_out_notification_methods: default_opt_out_notification_methods(),
152 });
153 Ok(running)
154 }
155
156 async fn abort_startup(
157 &self,
158 running: &Arc<RunningAppServer>,
159 experimental_api_enabled: bool,
160 opt_out_notification_methods: &[String],
161 message: String,
162 ) {
163 let _ = self.inbound_tx.send(AppServerInbound::HandshakeFailed {
164 runtime_id: self.launch_config.runtime_id.clone(),
165 message,
166 experimental_api_enabled,
167 opt_out_notification_methods: opt_out_notification_methods.to_vec(),
168 });
169 let _ = running.abort().await;
170 let mut guard = self.inner.lock().await;
171 if guard
172 .as_ref()
173 .map(|existing| Arc::ptr_eq(existing, running))
174 .unwrap_or(false)
175 {
176 *guard = None;
177 }
178 }
179}