Skip to main content

codex_mobile_bridge/
app_server.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{
4    Arc,
5    atomic::{AtomicBool, AtomicU64, Ordering},
6};
7
8use anyhow::{Context, Result, anyhow, bail};
9use serde::{Deserialize, Serialize};
10use serde_json::{Value, json};
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
12use tokio::process::{Child, ChildStdin, Command};
13use tokio::sync::{Mutex, mpsc, oneshot};
14use tokio::time::{Duration, sleep, timeout};
15use tracing::warn;
16
17use crate::bridge_protocol::{json_string, now_millis};
18
19#[derive(Debug, Clone)]
20pub struct AppServerLaunchConfig {
21    pub runtime_id: String,
22    pub codex_binary: String,
23    pub codex_home: Option<PathBuf>,
24}
25
26#[derive(Debug, Clone)]
27pub struct InitializeInfo {
28    pub user_agent: String,
29    pub codex_home: String,
30    pub platform_family: String,
31    pub platform_os: String,
32}
33
34#[derive(Debug, Clone)]
35pub enum AppServerInbound {
36    Starting {
37        runtime_id: String,
38    },
39    ProcessChanged {
40        runtime_id: String,
41        pid: Option<u32>,
42        running: bool,
43    },
44    Initialized {
45        runtime_id: String,
46        info: InitializeInfo,
47    },
48    Notification {
49        runtime_id: String,
50        method: String,
51        params: Value,
52    },
53    ServerRequest {
54        runtime_id: String,
55        id: Value,
56        method: String,
57        params: Value,
58    },
59    Exited {
60        runtime_id: String,
61        message: String,
62        expected: bool,
63    },
64    LogChunk {
65        runtime_id: String,
66        stream: String,
67        level: String,
68        source: String,
69        message: String,
70        detail: Option<Value>,
71        occurred_at_ms: i64,
72    },
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct RpcErrorPayload {
77    pub code: i64,
78    pub message: String,
79    #[serde(default)]
80    pub data: Option<Value>,
81}
82
83impl std::fmt::Display for RpcErrorPayload {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        write!(f, "[{}] {}", self.code, self.message)
86    }
87}
88
89pub struct AppServerManager {
90    launch_config: AppServerLaunchConfig,
91    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
92    inner: Mutex<Option<Arc<RunningAppServer>>>,
93}
94
95impl AppServerManager {
96    pub fn new(
97        launch_config: AppServerLaunchConfig,
98        inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
99    ) -> Self {
100        Self {
101            launch_config,
102            inbound_tx,
103            inner: Mutex::new(None),
104        }
105    }
106
107    pub fn runtime_id(&self) -> &str {
108        &self.launch_config.runtime_id
109    }
110
111    pub async fn start(&self) -> Result<()> {
112        self.ensure_started().await.map(|_| ())
113    }
114
115    pub async fn stop(&self) -> Result<()> {
116        let existing = {
117            let mut guard = self.inner.lock().await;
118            guard.take()
119        };
120
121        if let Some(existing) = existing {
122            existing.stop().await?;
123            for _ in 0..30 {
124                if !existing.is_alive() {
125                    break;
126                }
127                sleep(Duration::from_millis(100)).await;
128            }
129        }
130
131        Ok(())
132    }
133
134    pub async fn restart(&self) -> Result<()> {
135        self.stop().await?;
136        self.start().await
137    }
138
139    pub async fn request(&self, method: &str, params: Value) -> Result<Value> {
140        let running = self.ensure_started().await?;
141        running.request(method, params).await
142    }
143
144    pub async fn respond(&self, id: Value, result: Value) -> Result<()> {
145        let running = self.ensure_started().await?;
146        running.respond(id, result).await
147    }
148
149    pub async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
150        let running = self.ensure_started().await?;
151        running.respond_error(id, code, message).await
152    }
153
154    async fn ensure_started(&self) -> Result<Arc<RunningAppServer>> {
155        {
156            let guard = self.inner.lock().await;
157            if let Some(existing) = guard.as_ref().filter(|existing| existing.is_alive()) {
158                return Ok(Arc::clone(existing));
159            }
160        }
161
162        let _ = self.inbound_tx.send(AppServerInbound::Starting {
163            runtime_id: self.launch_config.runtime_id.clone(),
164        });
165        let running =
166            RunningAppServer::spawn(self.launch_config.clone(), self.inbound_tx.clone()).await?;
167        {
168            let mut guard = self.inner.lock().await;
169            *guard = Some(Arc::clone(&running));
170        }
171
172        let init_result = running
173            .request(
174                "initialize",
175                json!({
176                    "clientInfo": {
177                        "name": "codex-mobile-bridge",
178                        "title": "Codex Mobile Bridge",
179                        "version": env!("CARGO_PKG_VERSION"),
180                    },
181                    "capabilities": {
182                        "experimentalApi": true
183                    }
184                }),
185            )
186            .await?;
187
188        let info = parse_initialize_info(&init_result)?;
189        let _ = self.inbound_tx.send(AppServerInbound::Initialized {
190            runtime_id: self.launch_config.runtime_id.clone(),
191            info,
192        });
193        Ok(running)
194    }
195}
196
197struct RunningAppServer {
198    runtime_id: String,
199    stdin: Arc<Mutex<BufWriter<ChildStdin>>>,
200    child: Arc<Mutex<Option<Child>>>,
201    pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Value, RpcErrorPayload>>>>>,
202    next_id: AtomicU64,
203    alive: Arc<AtomicBool>,
204    stopping: Arc<AtomicBool>,
205}
206
207impl RunningAppServer {
208    async fn spawn(
209        launch_config: AppServerLaunchConfig,
210        inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
211    ) -> Result<Arc<Self>> {
212        let mut command = Command::new(&launch_config.codex_binary);
213        command.args(["app-server", "--listen", "stdio://"]);
214        command.stdin(std::process::Stdio::piped());
215        command.stdout(std::process::Stdio::piped());
216        command.stderr(std::process::Stdio::piped());
217        command.kill_on_drop(true);
218        command.env("CODEX_MOBILE_MANAGED", "1");
219        command.env("CODEX_MOBILE_RUNTIME_ID", &launch_config.runtime_id);
220
221        if let Some(codex_home) = launch_config.codex_home.as_ref() {
222            command.env("CODEX_HOME", codex_home);
223        }
224
225        let mut child = command
226            .spawn()
227            .with_context(|| build_spawn_error_context(&launch_config))?;
228        let pid = child.id();
229
230        let stdin = child.stdin.take().context("获取 app-server stdin 失败")?;
231        let stdout = child.stdout.take().context("获取 app-server stdout 失败")?;
232        let stderr = child.stderr.take().context("获取 app-server stderr 失败")?;
233
234        let running = Arc::new(Self {
235            runtime_id: launch_config.runtime_id.clone(),
236            stdin: Arc::new(Mutex::new(BufWriter::new(stdin))),
237            child: Arc::new(Mutex::new(Some(child))),
238            pending: Arc::new(Mutex::new(HashMap::new())),
239            next_id: AtomicU64::new(1),
240            alive: Arc::new(AtomicBool::new(true)),
241            stopping: Arc::new(AtomicBool::new(false)),
242        });
243
244        let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
245            runtime_id: launch_config.runtime_id.clone(),
246            pid,
247            running: true,
248        });
249
250        spawn_stdout_task(
251            Arc::clone(&running),
252            launch_config.runtime_id.clone(),
253            stdout,
254            inbound_tx.clone(),
255        );
256        spawn_stderr_task(launch_config.runtime_id.clone(), stderr, inbound_tx.clone());
257        spawn_wait_task(Arc::clone(&running), inbound_tx);
258
259        Ok(running)
260    }
261
262    fn is_alive(&self) -> bool {
263        self.alive.load(Ordering::SeqCst)
264    }
265
266    async fn stop(&self) -> Result<()> {
267        if !self.is_alive() {
268            return Ok(());
269        }
270
271        self.stopping.store(true, Ordering::SeqCst);
272
273        let mut child_guard = self.child.lock().await;
274        let Some(child) = child_guard.as_mut() else {
275            return Ok(());
276        };
277        child
278            .start_kill()
279            .with_context(|| format!("停止 runtime {} 失败", self.runtime_id))?;
280        Ok(())
281    }
282
283    async fn request(&self, method: &str, params: Value) -> Result<Value> {
284        if !self.is_alive() {
285            bail!("app-server 未运行");
286        }
287
288        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
289        let key = id.to_string();
290        let (tx, rx) = oneshot::channel();
291        {
292            let mut pending = self.pending.lock().await;
293            pending.insert(key.clone(), tx);
294        }
295
296        self.send_json(json!({
297            "jsonrpc": "2.0",
298            "id": id,
299            "method": method,
300            "params": params,
301        }))
302        .await?;
303
304        match timeout(Duration::from_secs(90), rx).await {
305            Ok(Ok(Ok(result))) => Ok(result),
306            Ok(Ok(Err(error))) => Err(anyhow!(error.to_string())),
307            Ok(Err(_)) => Err(anyhow!("等待 app-server 响应时通道关闭")),
308            Err(_) => {
309                self.pending.lock().await.remove(&key);
310                Err(anyhow!("等待 app-server 响应超时"))
311            }
312        }
313    }
314
315    async fn respond(&self, id: Value, result: Value) -> Result<()> {
316        self.send_json(json!({
317            "jsonrpc": "2.0",
318            "id": id,
319            "result": result,
320        }))
321        .await
322    }
323
324    async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
325        self.send_json(json!({
326            "jsonrpc": "2.0",
327            "id": id,
328            "error": {
329                "code": code,
330                "message": message,
331            }
332        }))
333        .await
334    }
335
336    async fn send_json(&self, payload: Value) -> Result<()> {
337        let line = serde_json::to_string(&payload)?;
338        let mut writer = self.stdin.lock().await;
339        writer.write_all(line.as_bytes()).await?;
340        writer.write_all(b"\n").await?;
341        writer.flush().await?;
342        Ok(())
343    }
344}
345
346fn spawn_stdout_task(
347    running: Arc<RunningAppServer>,
348    runtime_id: String,
349    stdout: tokio::process::ChildStdout,
350    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
351) {
352    tokio::spawn(async move {
353        let mut reader = BufReader::new(stdout).lines();
354        loop {
355            match reader.next_line().await {
356                Ok(Some(line)) => {
357                    if line.trim().is_empty() {
358                        continue;
359                    }
360                    if let Err(error) =
361                        handle_stdout_line(&running, &runtime_id, &inbound_tx, &line).await
362                    {
363                        warn!("解析 app-server 输出失败: {error}");
364                    }
365                }
366                Ok(None) => break,
367                Err(error) => {
368                    warn!("读取 app-server stdout 失败: {error}");
369                    break;
370                }
371            }
372        }
373    });
374}
375
376fn spawn_stderr_task(
377    runtime_id: String,
378    stderr: tokio::process::ChildStderr,
379    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
380) {
381    tokio::spawn(async move {
382        let mut reader = BufReader::new(stderr).lines();
383        loop {
384            match reader.next_line().await {
385                Ok(Some(line)) => {
386                    let trimmed = line.trim();
387                    if !trimmed.is_empty() {
388                        let (level, message) = parse_app_server_stderr_line(trimmed);
389                        warn!("app-server stderr [{runtime_id}]: {trimmed}");
390                        let _ = inbound_tx.send(AppServerInbound::LogChunk {
391                            runtime_id: runtime_id.clone(),
392                            stream: "stderr".to_string(),
393                            level,
394                            source: "app-server".to_string(),
395                            message,
396                            detail: None,
397                            occurred_at_ms: now_millis(),
398                        });
399                    }
400                }
401                Ok(None) => break,
402                Err(error) => {
403                    warn!("读取 app-server stderr 失败: {error}");
404                    break;
405                }
406            }
407        }
408    });
409}
410
411fn spawn_wait_task(
412    running: Arc<RunningAppServer>,
413    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
414) {
415    tokio::spawn(async move {
416        let message = loop {
417            let maybe_exit = {
418                let mut guard = running.child.lock().await;
419                let Some(child) = guard.as_mut() else {
420                    break format!("app-server 已退出: runtime {}", running.runtime_id);
421                };
422                match child.try_wait() {
423                    Ok(Some(status)) => {
424                        *guard = None;
425                        Some(Ok(status))
426                    }
427                    Ok(None) => None,
428                    Err(error) => {
429                        *guard = None;
430                        Some(Err(error))
431                    }
432                }
433            };
434
435            match maybe_exit {
436                Some(Ok(status)) => break format!("app-server 已退出: {status}"),
437                Some(Err(error)) => break format!("等待 app-server 退出失败: {error}"),
438                None => sleep(Duration::from_millis(300)).await,
439            }
440        };
441
442        running.alive.store(false, Ordering::SeqCst);
443        fail_pending_requests(&running, &message).await;
444        let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
445            runtime_id: running.runtime_id.clone(),
446            pid: None,
447            running: false,
448        });
449        let _ = inbound_tx.send(AppServerInbound::Exited {
450            runtime_id: running.runtime_id.clone(),
451            message,
452            expected: running.stopping.load(Ordering::SeqCst),
453        });
454    });
455}
456
457async fn handle_stdout_line(
458    running: &Arc<RunningAppServer>,
459    runtime_id: &str,
460    inbound_tx: &mpsc::UnboundedSender<AppServerInbound>,
461    line: &str,
462) -> Result<()> {
463    let message: Value = serde_json::from_str(line)?;
464    let method = message.get("method").and_then(Value::as_str);
465    let id = message.get("id").cloned();
466    let result = message.get("result").cloned();
467    let error = message.get("error").cloned();
468
469    match (method, id, result, error) {
470        (Some(method), Some(id), None, None) => {
471            let params = message.get("params").cloned().unwrap_or(Value::Null);
472            let _ = inbound_tx.send(AppServerInbound::ServerRequest {
473                runtime_id: runtime_id.to_string(),
474                id,
475                method: method.to_string(),
476                params,
477            });
478        }
479        (Some(method), None, None, None) => {
480            let params = message.get("params").cloned().unwrap_or(Value::Null);
481            let _ = inbound_tx.send(AppServerInbound::Notification {
482                runtime_id: runtime_id.to_string(),
483                method: method.to_string(),
484                params,
485            });
486        }
487        (None, Some(id), Some(result), _) => {
488            let key = json_string(&id);
489            if let Some(sender) = running.pending.lock().await.remove(&key) {
490                let _ = sender.send(Ok(result));
491            }
492        }
493        (None, Some(id), _, Some(error_value)) => {
494            let key = json_string(&id);
495            if let Some(sender) = running.pending.lock().await.remove(&key) {
496                let payload = serde_json::from_value::<RpcErrorPayload>(error_value)?;
497                let _ = sender.send(Err(payload));
498            }
499        }
500        _ => {
501            warn!("收到未知 app-server 消息: {line}");
502        }
503    }
504
505    Ok(())
506}
507
508async fn fail_pending_requests(running: &RunningAppServer, message: &str) {
509    let mut pending = running.pending.lock().await;
510    for (_, sender) in pending.drain() {
511        let _ = sender.send(Err(RpcErrorPayload {
512            code: -32001,
513            message: message.to_string(),
514            data: None,
515        }));
516    }
517}
518
519fn parse_initialize_info(value: &Value) -> Result<InitializeInfo> {
520    Ok(InitializeInfo {
521        user_agent: required_string(value, "userAgent")?.to_string(),
522        codex_home: required_string(value, "codexHome")?.to_string(),
523        platform_family: required_string(value, "platformFamily")?.to_string(),
524        platform_os: required_string(value, "platformOs")?.to_string(),
525    })
526}
527
528fn required_string<'a>(value: &'a Value, key: &str) -> Result<&'a str> {
529    value
530        .get(key)
531        .and_then(Value::as_str)
532        .with_context(|| format!("缺少字段 {key}"))
533}
534
535fn build_spawn_error_context(launch_config: &AppServerLaunchConfig) -> String {
536    let cwd = std::env::current_dir()
537        .map(|path| path.display().to_string())
538        .unwrap_or_else(|_| "<unknown>".to_string());
539    let path_env = std::env::var("PATH").unwrap_or_else(|_| "<unset>".to_string());
540    let codex_home = launch_config
541        .codex_home
542        .as_ref()
543        .map(|path| path.display().to_string())
544        .unwrap_or_else(|| "<unset>".to_string());
545    format!(
546        "启动 {} app-server 失败(runtime={} cwd={} CODEX_HOME={} PATH={})",
547        launch_config.codex_binary, launch_config.runtime_id, cwd, codex_home, path_env
548    )
549}
550
551fn parse_app_server_stderr_line(line: &str) -> (String, String) {
552    let normalized = line.trim().to_string();
553    let upper = normalized.to_uppercase();
554    let level = if upper.contains(" ERROR ") || upper.starts_with("ERROR ") {
555        "error"
556    } else if upper.contains(" WARN ") || upper.starts_with("WARN ") || upper.contains(" WARNING ")
557    {
558        "warn"
559    } else if upper.contains(" DEBUG ") || upper.starts_with("DEBUG ") {
560        "debug"
561    } else {
562        "info"
563    };
564    (level.to_string(), normalized)
565}