use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
};
use anyhow::{Context, Result, anyhow, bail};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::{Mutex, mpsc, oneshot};
use tokio::time::{Duration, sleep, timeout};
use tracing::warn;
use crate::bridge_protocol::{json_string, now_millis};
const APP_SERVER_EXPERIMENTAL_API_ENABLED: bool = true;
const APP_SERVER_OPT_OUT_NOTIFICATION_METHODS: &[&str] = &[
"account/login/completed",
"account/rateLimits/updated",
"account/updated",
"app/list/updated",
"fs/changed",
"fuzzyFileSearch/sessionCompleted",
"fuzzyFileSearch/sessionUpdated",
"hook/completed",
"hook/started",
"item/autoApprovalReview/completed",
"item/autoApprovalReview/started",
"item/commandExecution/terminalInteraction",
"mcpServer/oauthLogin/completed",
"mcpServer/startupStatus/updated",
"skills/changed",
"thread/compacted",
"thread/realtime/closed",
"thread/realtime/error",
"thread/realtime/itemAdded",
"thread/realtime/outputAudio/delta",
"thread/realtime/sdp",
"thread/realtime/started",
"thread/realtime/transcriptUpdated",
"windows/worldWritableWarning",
"windowsSandbox/setupCompleted",
];
#[derive(Debug, Clone)]
pub struct AppServerLaunchConfig {
pub runtime_id: String,
pub codex_binary: String,
pub codex_home: Option<PathBuf>,
}
#[derive(Debug, Clone)]
pub struct InitializeInfo {
pub user_agent: String,
pub codex_home: String,
pub platform_family: String,
pub platform_os: String,
}
#[derive(Debug, Clone)]
pub enum AppServerInbound {
Starting {
runtime_id: String,
},
ProcessChanged {
runtime_id: String,
pid: Option<u32>,
running: bool,
},
Initializing {
runtime_id: String,
experimental_api_enabled: bool,
opt_out_notification_methods: Vec<String>,
},
Initialized {
runtime_id: String,
info: InitializeInfo,
experimental_api_enabled: bool,
opt_out_notification_methods: Vec<String>,
},
HandshakeFailed {
runtime_id: String,
message: String,
experimental_api_enabled: bool,
opt_out_notification_methods: Vec<String>,
},
Notification {
runtime_id: String,
method: String,
params: Value,
},
ServerRequest {
runtime_id: String,
id: Value,
method: String,
params: Value,
},
Exited {
runtime_id: String,
message: String,
expected: bool,
},
LogChunk {
runtime_id: String,
stream: String,
level: String,
source: String,
message: String,
detail: Option<Value>,
occurred_at_ms: i64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpcErrorPayload {
pub code: i64,
pub message: String,
#[serde(default)]
pub data: Option<Value>,
}
impl std::fmt::Display for RpcErrorPayload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}] {}", self.code, self.message)
}
}
pub struct AppServerManager {
launch_config: AppServerLaunchConfig,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
inner: Mutex<Option<Arc<RunningAppServer>>>,
}
impl AppServerManager {
pub fn new(
launch_config: AppServerLaunchConfig,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) -> Self {
Self {
launch_config,
inbound_tx,
inner: Mutex::new(None),
}
}
pub fn runtime_id(&self) -> &str {
&self.launch_config.runtime_id
}
pub async fn start(&self) -> Result<()> {
self.ensure_started().await.map(|_| ())
}
pub async fn stop(&self) -> Result<()> {
let existing = {
let mut guard = self.inner.lock().await;
guard.take()
};
if let Some(existing) = existing {
existing.stop().await?;
for _ in 0..30 {
if !existing.is_alive() {
break;
}
sleep(Duration::from_millis(100)).await;
}
}
Ok(())
}
pub async fn restart(&self) -> Result<()> {
self.stop().await?;
self.start().await
}
pub async fn request(&self, method: &str, params: Value) -> Result<Value> {
let running = self.ensure_started().await?;
running.request(method, params).await
}
pub async fn respond(&self, id: Value, result: Value) -> Result<()> {
let running = self.ensure_started().await?;
running.respond(id, result).await
}
pub async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
let running = self.ensure_started().await?;
running.respond_error(id, code, message).await
}
async fn ensure_started(&self) -> Result<Arc<RunningAppServer>> {
{
let guard = self.inner.lock().await;
if let Some(existing) = guard.as_ref().filter(|existing| existing.is_alive()) {
return Ok(Arc::clone(existing));
}
}
let opt_out_notification_methods = default_opt_out_notification_methods();
let _ = self.inbound_tx.send(AppServerInbound::Starting {
runtime_id: self.launch_config.runtime_id.clone(),
});
let running =
RunningAppServer::spawn(self.launch_config.clone(), self.inbound_tx.clone()).await?;
{
let mut guard = self.inner.lock().await;
*guard = Some(Arc::clone(&running));
}
let _ = self.inbound_tx.send(AppServerInbound::Initializing {
runtime_id: self.launch_config.runtime_id.clone(),
experimental_api_enabled: APP_SERVER_EXPERIMENTAL_API_ENABLED,
opt_out_notification_methods: opt_out_notification_methods.clone(),
});
let init_result = match running
.request(
"initialize",
json!({
"clientInfo": {
"name": "codex-mobile-bridge",
"title": "Codex Mobile Bridge",
"version": env!("CARGO_PKG_VERSION"),
},
"capabilities": {
"experimentalApi": APP_SERVER_EXPERIMENTAL_API_ENABLED,
"optOutNotificationMethods": opt_out_notification_methods,
}
}),
)
.await
{
Ok(result) => result,
Err(error) => {
self.abort_startup(
&running,
APP_SERVER_EXPERIMENTAL_API_ENABLED,
&default_opt_out_notification_methods(),
format!("initialize 失败: {error}"),
)
.await;
return Err(error);
}
};
let info = match parse_initialize_info(&init_result) {
Ok(info) => info,
Err(error) => {
self.abort_startup(
&running,
APP_SERVER_EXPERIMENTAL_API_ENABLED,
&default_opt_out_notification_methods(),
format!("initialize 响应解析失败: {error}"),
)
.await;
return Err(error);
}
};
if let Err(error) = running.notify_initialized().await {
self.abort_startup(
&running,
APP_SERVER_EXPERIMENTAL_API_ENABLED,
&default_opt_out_notification_methods(),
format!("initialized 发送失败: {error}"),
)
.await;
return Err(error);
}
let _ = self.inbound_tx.send(AppServerInbound::Initialized {
runtime_id: self.launch_config.runtime_id.clone(),
info,
experimental_api_enabled: APP_SERVER_EXPERIMENTAL_API_ENABLED,
opt_out_notification_methods: default_opt_out_notification_methods(),
});
Ok(running)
}
async fn abort_startup(
&self,
running: &Arc<RunningAppServer>,
experimental_api_enabled: bool,
opt_out_notification_methods: &[String],
message: String,
) {
let _ = self.inbound_tx.send(AppServerInbound::HandshakeFailed {
runtime_id: self.launch_config.runtime_id.clone(),
message,
experimental_api_enabled,
opt_out_notification_methods: opt_out_notification_methods.to_vec(),
});
let _ = running.abort().await;
let mut guard = self.inner.lock().await;
if guard
.as_ref()
.map(|existing| Arc::ptr_eq(existing, running))
.unwrap_or(false)
{
*guard = None;
}
}
}
struct RunningAppServer {
runtime_id: String,
stdin: Arc<Mutex<BufWriter<ChildStdin>>>,
child: Arc<Mutex<Option<Child>>>,
pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Value, RpcErrorPayload>>>>>,
next_id: AtomicU64,
alive: Arc<AtomicBool>,
stopping: Arc<AtomicBool>,
}
impl RunningAppServer {
async fn spawn(
launch_config: AppServerLaunchConfig,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) -> Result<Arc<Self>> {
let mut command = Command::new(&launch_config.codex_binary);
command.args(["app-server", "--listen", "stdio://"]);
command.stdin(std::process::Stdio::piped());
command.stdout(std::process::Stdio::piped());
command.stderr(std::process::Stdio::piped());
command.kill_on_drop(true);
command.env("CODEX_MOBILE_MANAGED", "1");
command.env("CODEX_MOBILE_RUNTIME_ID", &launch_config.runtime_id);
if let Some(codex_home) = launch_config.codex_home.as_ref() {
command.env("CODEX_HOME", codex_home);
}
let mut child = command
.spawn()
.with_context(|| build_spawn_error_context(&launch_config))?;
let pid = child.id();
let stdin = child.stdin.take().context("获取 app-server stdin 失败")?;
let stdout = child.stdout.take().context("获取 app-server stdout 失败")?;
let stderr = child.stderr.take().context("获取 app-server stderr 失败")?;
let running = Arc::new(Self {
runtime_id: launch_config.runtime_id.clone(),
stdin: Arc::new(Mutex::new(BufWriter::new(stdin))),
child: Arc::new(Mutex::new(Some(child))),
pending: Arc::new(Mutex::new(HashMap::new())),
next_id: AtomicU64::new(1),
alive: Arc::new(AtomicBool::new(true)),
stopping: Arc::new(AtomicBool::new(false)),
});
let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
runtime_id: launch_config.runtime_id.clone(),
pid,
running: true,
});
spawn_stdout_task(
Arc::clone(&running),
launch_config.runtime_id.clone(),
stdout,
inbound_tx.clone(),
);
spawn_stderr_task(launch_config.runtime_id.clone(), stderr, inbound_tx.clone());
spawn_wait_task(Arc::clone(&running), inbound_tx);
Ok(running)
}
fn is_alive(&self) -> bool {
self.alive.load(Ordering::SeqCst)
}
async fn stop(&self) -> Result<()> {
if !self.is_alive() {
return Ok(());
}
self.stopping.store(true, Ordering::SeqCst);
self.kill_process().await
}
async fn abort(&self) -> Result<()> {
if !self.is_alive() {
return Ok(());
}
self.kill_process().await
}
async fn kill_process(&self) -> Result<()> {
let mut child_guard = self.child.lock().await;
let Some(child) = child_guard.as_mut() else {
return Ok(());
};
child
.start_kill()
.with_context(|| format!("停止 runtime {} 失败", self.runtime_id))?;
Ok(())
}
async fn request(&self, method: &str, params: Value) -> Result<Value> {
if !self.is_alive() {
bail!("app-server 未运行");
}
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let key = id.to_string();
let (tx, rx) = oneshot::channel();
{
let mut pending = self.pending.lock().await;
pending.insert(key.clone(), tx);
}
self.send_json(json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
}))
.await?;
match timeout(Duration::from_secs(90), rx).await {
Ok(Ok(Ok(result))) => Ok(result),
Ok(Ok(Err(error))) => Err(anyhow!(error.to_string())),
Ok(Err(_)) => Err(anyhow!("等待 app-server 响应时通道关闭")),
Err(_) => {
self.pending.lock().await.remove(&key);
Err(anyhow!("等待 app-server 响应超时"))
}
}
}
async fn respond(&self, id: Value, result: Value) -> Result<()> {
self.send_json(json!({
"jsonrpc": "2.0",
"id": id,
"result": result,
}))
.await
}
async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
self.send_json(json!({
"jsonrpc": "2.0",
"id": id,
"error": {
"code": code,
"message": message,
}
}))
.await
}
async fn send_json(&self, payload: Value) -> Result<()> {
let line = serde_json::to_string(&payload)?;
let mut writer = self.stdin.lock().await;
writer.write_all(line.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await?;
Ok(())
}
async fn send_notification(&self, method: &str, params: Option<Value>) -> Result<()> {
let mut payload = json!({
"jsonrpc": "2.0",
"method": method,
});
if let Some(params) = params {
payload["params"] = params;
}
self.send_json(payload).await
}
async fn notify_initialized(&self) -> Result<()> {
self.send_notification("initialized", None).await
}
}
fn spawn_stdout_task(
running: Arc<RunningAppServer>,
runtime_id: String,
stdout: tokio::process::ChildStdout,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) {
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
if line.trim().is_empty() {
continue;
}
if let Err(error) =
handle_stdout_line(&running, &runtime_id, &inbound_tx, &line).await
{
warn!("解析 app-server 输出失败: {error}");
}
}
Ok(None) => break,
Err(error) => {
warn!("读取 app-server stdout 失败: {error}");
break;
}
}
}
});
}
fn spawn_stderr_task(
runtime_id: String,
stderr: tokio::process::ChildStderr,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) {
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
let trimmed = line.trim();
if !trimmed.is_empty() {
let (level, message) = parse_app_server_stderr_line(trimmed);
warn!("app-server stderr [{runtime_id}]: {trimmed}");
let _ = inbound_tx.send(AppServerInbound::LogChunk {
runtime_id: runtime_id.clone(),
stream: "stderr".to_string(),
level,
source: "app-server".to_string(),
message,
detail: None,
occurred_at_ms: now_millis(),
});
}
}
Ok(None) => break,
Err(error) => {
warn!("读取 app-server stderr 失败: {error}");
break;
}
}
}
});
}
fn spawn_wait_task(
running: Arc<RunningAppServer>,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) {
tokio::spawn(async move {
let message = loop {
let maybe_exit = {
let mut guard = running.child.lock().await;
let Some(child) = guard.as_mut() else {
break format!("app-server 已退出: runtime {}", running.runtime_id);
};
match child.try_wait() {
Ok(Some(status)) => {
*guard = None;
Some(Ok(status))
}
Ok(None) => None,
Err(error) => {
*guard = None;
Some(Err(error))
}
}
};
match maybe_exit {
Some(Ok(status)) => break format!("app-server 已退出: {status}"),
Some(Err(error)) => break format!("等待 app-server 退出失败: {error}"),
None => sleep(Duration::from_millis(300)).await,
}
};
running.alive.store(false, Ordering::SeqCst);
fail_pending_requests(&running, &message).await;
let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
runtime_id: running.runtime_id.clone(),
pid: None,
running: false,
});
let _ = inbound_tx.send(AppServerInbound::Exited {
runtime_id: running.runtime_id.clone(),
message,
expected: running.stopping.load(Ordering::SeqCst),
});
});
}
async fn handle_stdout_line(
running: &Arc<RunningAppServer>,
runtime_id: &str,
inbound_tx: &mpsc::UnboundedSender<AppServerInbound>,
line: &str,
) -> Result<()> {
let message: Value = serde_json::from_str(line)?;
let method = message.get("method").and_then(Value::as_str);
let id = message.get("id").cloned();
let result = message.get("result").cloned();
let error = message.get("error").cloned();
match (method, id, result, error) {
(Some(method), Some(id), None, None) => {
let params = message.get("params").cloned().unwrap_or(Value::Null);
let _ = inbound_tx.send(AppServerInbound::ServerRequest {
runtime_id: runtime_id.to_string(),
id,
method: method.to_string(),
params,
});
}
(Some(method), None, None, None) => {
let params = message.get("params").cloned().unwrap_or(Value::Null);
let _ = inbound_tx.send(AppServerInbound::Notification {
runtime_id: runtime_id.to_string(),
method: method.to_string(),
params,
});
}
(None, Some(id), Some(result), _) => {
let key = json_string(&id);
if let Some(sender) = running.pending.lock().await.remove(&key) {
let _ = sender.send(Ok(result));
}
}
(None, Some(id), _, Some(error_value)) => {
let key = json_string(&id);
if let Some(sender) = running.pending.lock().await.remove(&key) {
let payload = serde_json::from_value::<RpcErrorPayload>(error_value)?;
let _ = sender.send(Err(payload));
}
}
_ => {
warn!("收到未知 app-server 消息: {line}");
}
}
Ok(())
}
async fn fail_pending_requests(running: &RunningAppServer, message: &str) {
let mut pending = running.pending.lock().await;
for (_, sender) in pending.drain() {
let _ = sender.send(Err(RpcErrorPayload {
code: -32001,
message: message.to_string(),
data: None,
}));
}
}
fn parse_initialize_info(value: &Value) -> Result<InitializeInfo> {
Ok(InitializeInfo {
user_agent: required_string(value, "userAgent")?.to_string(),
codex_home: required_string(value, "codexHome")?.to_string(),
platform_family: required_string(value, "platformFamily")?.to_string(),
platform_os: required_string(value, "platformOs")?.to_string(),
})
}
fn required_string<'a>(value: &'a Value, key: &str) -> Result<&'a str> {
value
.get(key)
.and_then(Value::as_str)
.with_context(|| format!("缺少字段 {key}"))
}
fn build_spawn_error_context(launch_config: &AppServerLaunchConfig) -> String {
let cwd = std::env::current_dir()
.map(|path| path.display().to_string())
.unwrap_or_else(|_| "<unknown>".to_string());
let path_env = std::env::var("PATH").unwrap_or_else(|_| "<unset>".to_string());
let codex_home = launch_config
.codex_home
.as_ref()
.map(|path| path.display().to_string())
.unwrap_or_else(|| "<unset>".to_string());
format!(
"启动 {} app-server 失败(runtime={} cwd={} CODEX_HOME={} PATH={})",
launch_config.codex_binary, launch_config.runtime_id, cwd, codex_home, path_env
)
}
fn parse_app_server_stderr_line(line: &str) -> (String, String) {
let normalized = line.trim().to_string();
let upper = normalized.to_uppercase();
let level = if upper.contains(" ERROR ") || upper.starts_with("ERROR ") {
"error"
} else if upper.contains(" WARN ") || upper.starts_with("WARN ") || upper.contains(" WARNING ")
{
"warn"
} else if upper.contains(" DEBUG ") || upper.starts_with("DEBUG ") {
"debug"
} else {
"info"
};
(level.to_string(), normalized)
}
fn default_opt_out_notification_methods() -> Vec<String> {
APP_SERVER_OPT_OUT_NOTIFICATION_METHODS
.iter()
.map(|method| (*method).to_string())
.collect()
}
#[cfg(test)]
mod tests {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use tempfile::tempdir;
use tokio::sync::mpsc;
use tokio::time::{Duration, timeout};
use super::{
APP_SERVER_EXPERIMENTAL_API_ENABLED, AppServerInbound, AppServerLaunchConfig,
AppServerManager, default_opt_out_notification_methods,
};
#[tokio::test]
async fn start_sends_initialize_initialized_and_opt_out_notifications() {
let temp_dir = tempdir().expect("创建临时目录失败");
let log_path = temp_dir.path().join("received.json");
let script_path = temp_dir.path().join("fake-codex");
fs::write(
&script_path,
format!(
r#"#!/usr/bin/env python3
import json
import pathlib
import sys
messages = []
log_path = pathlib.Path({log_path:?})
for raw_line in sys.stdin:
line = raw_line.strip()
if not line:
continue
message = json.loads(line)
messages.append(message)
if message.get("method") == "initialize":
print(json.dumps({{
"jsonrpc": "2.0",
"id": message["id"],
"result": {{
"userAgent": "codex-test",
"codexHome": "/tmp/codex-home",
"platformFamily": "unix",
"platformOs": "linux"
}}
}}), flush=True)
elif message.get("method") == "initialized":
log_path.write_text(json.dumps(messages))
break
"#,
log_path = log_path.display().to_string(),
),
)
.expect("写入 fake codex 脚本失败");
let mut permissions = fs::metadata(&script_path)
.expect("读取脚本权限失败")
.permissions();
permissions.set_mode(0o755);
fs::set_permissions(&script_path, permissions).expect("设置脚本权限失败");
let (inbound_tx, mut inbound_rx) = mpsc::unbounded_channel();
let manager = AppServerManager::new(
AppServerLaunchConfig {
runtime_id: "primary".to_string(),
codex_binary: script_path.display().to_string(),
codex_home: None,
},
inbound_tx,
);
manager.start().await.expect("启动 app-server manager 失败");
let initialized = timeout(Duration::from_secs(5), async {
loop {
match inbound_rx.recv().await {
Some(AppServerInbound::Initialized {
experimental_api_enabled,
opt_out_notification_methods,
..
}) => break (experimental_api_enabled, opt_out_notification_methods),
Some(_) => {}
None => panic!("inbound channel 意外关闭"),
}
}
})
.await
.expect("等待 Initialized 事件超时");
assert!(initialized.0);
assert_eq!(initialized.1, default_opt_out_notification_methods());
let recorded = timeout(Duration::from_secs(5), async {
loop {
if log_path.exists() {
break fs::read_to_string(&log_path).expect("读取记录文件失败");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("等待 fake codex 记录消息超时");
let messages: serde_json::Value =
serde_json::from_str(&recorded).expect("解析记录消息失败");
let entries = messages.as_array().expect("记录消息应为数组");
assert_eq!(entries.len(), 2);
assert_eq!(entries[0]["method"], "initialize");
assert_eq!(
entries[0]["params"]["capabilities"]["experimentalApi"],
APP_SERVER_EXPERIMENTAL_API_ENABLED
);
assert_eq!(
entries[0]["params"]["capabilities"]["optOutNotificationMethods"],
serde_json::to_value(default_opt_out_notification_methods()).expect("序列化 opt-out 列表失败")
);
assert_eq!(entries[1]["method"], "initialized");
}
}