1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{
4 Arc,
5 atomic::{AtomicBool, AtomicU64, Ordering},
6};
7
8use anyhow::{Context, Result, anyhow, bail};
9use serde::{Deserialize, Serialize};
10use serde_json::{Value, json};
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
12use tokio::process::{Child, ChildStdin, Command};
13use tokio::sync::{Mutex, mpsc, oneshot};
14use tokio::time::{Duration, sleep, timeout};
15use tracing::warn;
16
17use crate::bridge_protocol::{json_string, now_millis};
18
19#[derive(Debug, Clone)]
20pub struct AppServerLaunchConfig {
21 pub runtime_id: String,
22 pub codex_binary: String,
23 pub codex_home: Option<PathBuf>,
24}
25
26#[derive(Debug, Clone)]
27pub struct InitializeInfo {
28 pub user_agent: String,
29 pub codex_home: String,
30 pub platform_family: String,
31 pub platform_os: String,
32}
33
34#[derive(Debug, Clone)]
35pub enum AppServerInbound {
36 Starting {
37 runtime_id: String,
38 },
39 ProcessChanged {
40 runtime_id: String,
41 pid: Option<u32>,
42 running: bool,
43 },
44 Initialized {
45 runtime_id: String,
46 info: InitializeInfo,
47 },
48 Notification {
49 runtime_id: String,
50 method: String,
51 params: Value,
52 },
53 ServerRequest {
54 runtime_id: String,
55 id: Value,
56 method: String,
57 params: Value,
58 },
59 Exited {
60 runtime_id: String,
61 message: String,
62 expected: bool,
63 },
64 LogChunk {
65 runtime_id: String,
66 stream: String,
67 level: String,
68 source: String,
69 message: String,
70 detail: Option<Value>,
71 occurred_at_ms: i64,
72 },
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct RpcErrorPayload {
77 pub code: i64,
78 pub message: String,
79 #[serde(default)]
80 pub data: Option<Value>,
81}
82
83impl std::fmt::Display for RpcErrorPayload {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 write!(f, "[{}] {}", self.code, self.message)
86 }
87}
88
89pub struct AppServerManager {
90 launch_config: AppServerLaunchConfig,
91 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
92 inner: Mutex<Option<Arc<RunningAppServer>>>,
93}
94
95impl AppServerManager {
96 pub fn new(
97 launch_config: AppServerLaunchConfig,
98 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
99 ) -> Self {
100 Self {
101 launch_config,
102 inbound_tx,
103 inner: Mutex::new(None),
104 }
105 }
106
107 pub fn runtime_id(&self) -> &str {
108 &self.launch_config.runtime_id
109 }
110
111 pub async fn start(&self) -> Result<()> {
112 self.ensure_started().await.map(|_| ())
113 }
114
115 pub async fn stop(&self) -> Result<()> {
116 let existing = {
117 let mut guard = self.inner.lock().await;
118 guard.take()
119 };
120
121 if let Some(existing) = existing {
122 existing.stop().await?;
123 for _ in 0..30 {
124 if !existing.is_alive() {
125 break;
126 }
127 sleep(Duration::from_millis(100)).await;
128 }
129 }
130
131 Ok(())
132 }
133
134 pub async fn restart(&self) -> Result<()> {
135 self.stop().await?;
136 self.start().await
137 }
138
139 pub async fn request(&self, method: &str, params: Value) -> Result<Value> {
140 let running = self.ensure_started().await?;
141 running.request(method, params).await
142 }
143
144 pub async fn respond(&self, id: Value, result: Value) -> Result<()> {
145 let running = self.ensure_started().await?;
146 running.respond(id, result).await
147 }
148
149 pub async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
150 let running = self.ensure_started().await?;
151 running.respond_error(id, code, message).await
152 }
153
154 async fn ensure_started(&self) -> Result<Arc<RunningAppServer>> {
155 {
156 let guard = self.inner.lock().await;
157 if let Some(existing) = guard.as_ref().filter(|existing| existing.is_alive()) {
158 return Ok(Arc::clone(existing));
159 }
160 }
161
162 let _ = self.inbound_tx.send(AppServerInbound::Starting {
163 runtime_id: self.launch_config.runtime_id.clone(),
164 });
165 let running =
166 RunningAppServer::spawn(self.launch_config.clone(), self.inbound_tx.clone()).await?;
167 {
168 let mut guard = self.inner.lock().await;
169 *guard = Some(Arc::clone(&running));
170 }
171
172 let init_result = running
173 .request(
174 "initialize",
175 json!({
176 "clientInfo": {
177 "name": "codex-mobile-bridge",
178 "title": "Codex Mobile Bridge",
179 "version": env!("CARGO_PKG_VERSION"),
180 },
181 "capabilities": {
182 "experimentalApi": true
183 }
184 }),
185 )
186 .await?;
187
188 let info = parse_initialize_info(&init_result)?;
189 let _ = self.inbound_tx.send(AppServerInbound::Initialized {
190 runtime_id: self.launch_config.runtime_id.clone(),
191 info,
192 });
193 Ok(running)
194 }
195}
196
197struct RunningAppServer {
198 runtime_id: String,
199 stdin: Arc<Mutex<BufWriter<ChildStdin>>>,
200 child: Arc<Mutex<Option<Child>>>,
201 pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Value, RpcErrorPayload>>>>>,
202 next_id: AtomicU64,
203 alive: Arc<AtomicBool>,
204 stopping: Arc<AtomicBool>,
205}
206
207impl RunningAppServer {
208 async fn spawn(
209 launch_config: AppServerLaunchConfig,
210 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
211 ) -> Result<Arc<Self>> {
212 let mut command = Command::new(&launch_config.codex_binary);
213 command.args(["app-server", "--listen", "stdio://"]);
214 command.stdin(std::process::Stdio::piped());
215 command.stdout(std::process::Stdio::piped());
216 command.stderr(std::process::Stdio::piped());
217 command.kill_on_drop(true);
218 command.env("CODEX_MOBILE_MANAGED", "1");
219 command.env("CODEX_MOBILE_RUNTIME_ID", &launch_config.runtime_id);
220
221 if let Some(codex_home) = launch_config.codex_home.as_ref() {
222 command.env("CODEX_HOME", codex_home);
223 }
224
225 let mut child = command
226 .spawn()
227 .with_context(|| build_spawn_error_context(&launch_config))?;
228 let pid = child.id();
229
230 let stdin = child.stdin.take().context("获取 app-server stdin 失败")?;
231 let stdout = child.stdout.take().context("获取 app-server stdout 失败")?;
232 let stderr = child.stderr.take().context("获取 app-server stderr 失败")?;
233
234 let running = Arc::new(Self {
235 runtime_id: launch_config.runtime_id.clone(),
236 stdin: Arc::new(Mutex::new(BufWriter::new(stdin))),
237 child: Arc::new(Mutex::new(Some(child))),
238 pending: Arc::new(Mutex::new(HashMap::new())),
239 next_id: AtomicU64::new(1),
240 alive: Arc::new(AtomicBool::new(true)),
241 stopping: Arc::new(AtomicBool::new(false)),
242 });
243
244 let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
245 runtime_id: launch_config.runtime_id.clone(),
246 pid,
247 running: true,
248 });
249
250 spawn_stdout_task(
251 Arc::clone(&running),
252 launch_config.runtime_id.clone(),
253 stdout,
254 inbound_tx.clone(),
255 );
256 spawn_stderr_task(launch_config.runtime_id.clone(), stderr, inbound_tx.clone());
257 spawn_wait_task(Arc::clone(&running), inbound_tx);
258
259 Ok(running)
260 }
261
262 fn is_alive(&self) -> bool {
263 self.alive.load(Ordering::SeqCst)
264 }
265
266 async fn stop(&self) -> Result<()> {
267 if !self.is_alive() {
268 return Ok(());
269 }
270
271 self.stopping.store(true, Ordering::SeqCst);
272
273 let mut child_guard = self.child.lock().await;
274 let Some(child) = child_guard.as_mut() else {
275 return Ok(());
276 };
277 child
278 .start_kill()
279 .with_context(|| format!("停止 runtime {} 失败", self.runtime_id))?;
280 Ok(())
281 }
282
283 async fn request(&self, method: &str, params: Value) -> Result<Value> {
284 if !self.is_alive() {
285 bail!("app-server 未运行");
286 }
287
288 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
289 let key = id.to_string();
290 let (tx, rx) = oneshot::channel();
291 {
292 let mut pending = self.pending.lock().await;
293 pending.insert(key.clone(), tx);
294 }
295
296 self.send_json(json!({
297 "jsonrpc": "2.0",
298 "id": id,
299 "method": method,
300 "params": params,
301 }))
302 .await?;
303
304 match timeout(Duration::from_secs(90), rx).await {
305 Ok(Ok(Ok(result))) => Ok(result),
306 Ok(Ok(Err(error))) => Err(anyhow!(error.to_string())),
307 Ok(Err(_)) => Err(anyhow!("等待 app-server 响应时通道关闭")),
308 Err(_) => {
309 self.pending.lock().await.remove(&key);
310 Err(anyhow!("等待 app-server 响应超时"))
311 }
312 }
313 }
314
315 async fn respond(&self, id: Value, result: Value) -> Result<()> {
316 self.send_json(json!({
317 "jsonrpc": "2.0",
318 "id": id,
319 "result": result,
320 }))
321 .await
322 }
323
324 async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
325 self.send_json(json!({
326 "jsonrpc": "2.0",
327 "id": id,
328 "error": {
329 "code": code,
330 "message": message,
331 }
332 }))
333 .await
334 }
335
336 async fn send_json(&self, payload: Value) -> Result<()> {
337 let line = serde_json::to_string(&payload)?;
338 let mut writer = self.stdin.lock().await;
339 writer.write_all(line.as_bytes()).await?;
340 writer.write_all(b"\n").await?;
341 writer.flush().await?;
342 Ok(())
343 }
344}
345
346fn spawn_stdout_task(
347 running: Arc<RunningAppServer>,
348 runtime_id: String,
349 stdout: tokio::process::ChildStdout,
350 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
351) {
352 tokio::spawn(async move {
353 let mut reader = BufReader::new(stdout).lines();
354 loop {
355 match reader.next_line().await {
356 Ok(Some(line)) => {
357 if line.trim().is_empty() {
358 continue;
359 }
360 if let Err(error) =
361 handle_stdout_line(&running, &runtime_id, &inbound_tx, &line).await
362 {
363 warn!("解析 app-server 输出失败: {error}");
364 }
365 }
366 Ok(None) => break,
367 Err(error) => {
368 warn!("读取 app-server stdout 失败: {error}");
369 break;
370 }
371 }
372 }
373 });
374}
375
376fn spawn_stderr_task(
377 runtime_id: String,
378 stderr: tokio::process::ChildStderr,
379 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
380) {
381 tokio::spawn(async move {
382 let mut reader = BufReader::new(stderr).lines();
383 loop {
384 match reader.next_line().await {
385 Ok(Some(line)) => {
386 let trimmed = line.trim();
387 if !trimmed.is_empty() {
388 let (level, message) = parse_app_server_stderr_line(trimmed);
389 warn!("app-server stderr [{runtime_id}]: {trimmed}");
390 let _ = inbound_tx.send(AppServerInbound::LogChunk {
391 runtime_id: runtime_id.clone(),
392 stream: "stderr".to_string(),
393 level,
394 source: "app-server".to_string(),
395 message,
396 detail: None,
397 occurred_at_ms: now_millis(),
398 });
399 }
400 }
401 Ok(None) => break,
402 Err(error) => {
403 warn!("读取 app-server stderr 失败: {error}");
404 break;
405 }
406 }
407 }
408 });
409}
410
411fn spawn_wait_task(
412 running: Arc<RunningAppServer>,
413 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
414) {
415 tokio::spawn(async move {
416 let message = loop {
417 let maybe_exit = {
418 let mut guard = running.child.lock().await;
419 let Some(child) = guard.as_mut() else {
420 break format!("app-server 已退出: runtime {}", running.runtime_id);
421 };
422 match child.try_wait() {
423 Ok(Some(status)) => {
424 *guard = None;
425 Some(Ok(status))
426 }
427 Ok(None) => None,
428 Err(error) => {
429 *guard = None;
430 Some(Err(error))
431 }
432 }
433 };
434
435 match maybe_exit {
436 Some(Ok(status)) => break format!("app-server 已退出: {status}"),
437 Some(Err(error)) => break format!("等待 app-server 退出失败: {error}"),
438 None => sleep(Duration::from_millis(300)).await,
439 }
440 };
441
442 running.alive.store(false, Ordering::SeqCst);
443 fail_pending_requests(&running, &message).await;
444 let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
445 runtime_id: running.runtime_id.clone(),
446 pid: None,
447 running: false,
448 });
449 let _ = inbound_tx.send(AppServerInbound::Exited {
450 runtime_id: running.runtime_id.clone(),
451 message,
452 expected: running.stopping.load(Ordering::SeqCst),
453 });
454 });
455}
456
457async fn handle_stdout_line(
458 running: &Arc<RunningAppServer>,
459 runtime_id: &str,
460 inbound_tx: &mpsc::UnboundedSender<AppServerInbound>,
461 line: &str,
462) -> Result<()> {
463 let message: Value = serde_json::from_str(line)?;
464 let method = message.get("method").and_then(Value::as_str);
465 let id = message.get("id").cloned();
466 let result = message.get("result").cloned();
467 let error = message.get("error").cloned();
468
469 match (method, id, result, error) {
470 (Some(method), Some(id), None, None) => {
471 let params = message.get("params").cloned().unwrap_or(Value::Null);
472 let _ = inbound_tx.send(AppServerInbound::ServerRequest {
473 runtime_id: runtime_id.to_string(),
474 id,
475 method: method.to_string(),
476 params,
477 });
478 }
479 (Some(method), None, None, None) => {
480 let params = message.get("params").cloned().unwrap_or(Value::Null);
481 let _ = inbound_tx.send(AppServerInbound::Notification {
482 runtime_id: runtime_id.to_string(),
483 method: method.to_string(),
484 params,
485 });
486 }
487 (None, Some(id), Some(result), _) => {
488 let key = json_string(&id);
489 if let Some(sender) = running.pending.lock().await.remove(&key) {
490 let _ = sender.send(Ok(result));
491 }
492 }
493 (None, Some(id), _, Some(error_value)) => {
494 let key = json_string(&id);
495 if let Some(sender) = running.pending.lock().await.remove(&key) {
496 let payload = serde_json::from_value::<RpcErrorPayload>(error_value)?;
497 let _ = sender.send(Err(payload));
498 }
499 }
500 _ => {
501 warn!("收到未知 app-server 消息: {line}");
502 }
503 }
504
505 Ok(())
506}
507
508async fn fail_pending_requests(running: &RunningAppServer, message: &str) {
509 let mut pending = running.pending.lock().await;
510 for (_, sender) in pending.drain() {
511 let _ = sender.send(Err(RpcErrorPayload {
512 code: -32001,
513 message: message.to_string(),
514 data: None,
515 }));
516 }
517}
518
519fn parse_initialize_info(value: &Value) -> Result<InitializeInfo> {
520 Ok(InitializeInfo {
521 user_agent: required_string(value, "userAgent")?.to_string(),
522 codex_home: required_string(value, "codexHome")?.to_string(),
523 platform_family: required_string(value, "platformFamily")?.to_string(),
524 platform_os: required_string(value, "platformOs")?.to_string(),
525 })
526}
527
528fn required_string<'a>(value: &'a Value, key: &str) -> Result<&'a str> {
529 value
530 .get(key)
531 .and_then(Value::as_str)
532 .with_context(|| format!("缺少字段 {key}"))
533}
534
535fn build_spawn_error_context(launch_config: &AppServerLaunchConfig) -> String {
536 let cwd = std::env::current_dir()
537 .map(|path| path.display().to_string())
538 .unwrap_or_else(|_| "<unknown>".to_string());
539 let path_env = std::env::var("PATH").unwrap_or_else(|_| "<unset>".to_string());
540 let codex_home = launch_config
541 .codex_home
542 .as_ref()
543 .map(|path| path.display().to_string())
544 .unwrap_or_else(|| "<unset>".to_string());
545 format!(
546 "启动 {} app-server 失败(runtime={} cwd={} CODEX_HOME={} PATH={})",
547 launch_config.codex_binary, launch_config.runtime_id, cwd, codex_home, path_env
548 )
549}
550
551fn parse_app_server_stderr_line(line: &str) -> (String, String) {
552 let normalized = line.trim().to_string();
553 let upper = normalized.to_uppercase();
554 let level = if upper.contains(" ERROR ") || upper.starts_with("ERROR ") {
555 "error"
556 } else if upper.contains(" WARN ") || upper.starts_with("WARN ") || upper.contains(" WARNING ")
557 {
558 "warn"
559 } else if upper.contains(" DEBUG ") || upper.starts_with("DEBUG ") {
560 "debug"
561 } else {
562 "info"
563 };
564 (level.to_string(), normalized)
565}