use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
};
use anyhow::{Context, Result, anyhow, bail};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::{Mutex, mpsc, oneshot};
use tokio::time::{Duration, sleep, timeout};
use tracing::warn;
use crate::bridge_protocol::{json_string, now_millis};
#[derive(Debug, Clone)]
pub struct AppServerLaunchConfig {
pub runtime_id: String,
pub codex_binary: String,
pub codex_home: Option<PathBuf>,
}
#[derive(Debug, Clone)]
pub struct InitializeInfo {
pub user_agent: String,
pub codex_home: String,
pub platform_family: String,
pub platform_os: String,
}
#[derive(Debug, Clone)]
pub enum AppServerInbound {
Starting {
runtime_id: String,
},
ProcessChanged {
runtime_id: String,
pid: Option<u32>,
running: bool,
},
Initialized {
runtime_id: String,
info: InitializeInfo,
},
Notification {
runtime_id: String,
method: String,
params: Value,
},
ServerRequest {
runtime_id: String,
id: Value,
method: String,
params: Value,
},
Exited {
runtime_id: String,
message: String,
expected: bool,
},
LogChunk {
runtime_id: String,
stream: String,
level: String,
source: String,
message: String,
detail: Option<Value>,
occurred_at_ms: i64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpcErrorPayload {
pub code: i64,
pub message: String,
#[serde(default)]
pub data: Option<Value>,
}
impl std::fmt::Display for RpcErrorPayload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}] {}", self.code, self.message)
}
}
pub struct AppServerManager {
launch_config: AppServerLaunchConfig,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
inner: Mutex<Option<Arc<RunningAppServer>>>,
}
impl AppServerManager {
pub fn new(
launch_config: AppServerLaunchConfig,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) -> Self {
Self {
launch_config,
inbound_tx,
inner: Mutex::new(None),
}
}
pub fn runtime_id(&self) -> &str {
&self.launch_config.runtime_id
}
pub async fn start(&self) -> Result<()> {
self.ensure_started().await.map(|_| ())
}
pub async fn stop(&self) -> Result<()> {
let existing = {
let mut guard = self.inner.lock().await;
guard.take()
};
if let Some(existing) = existing {
existing.stop().await?;
for _ in 0..30 {
if !existing.is_alive() {
break;
}
sleep(Duration::from_millis(100)).await;
}
}
Ok(())
}
pub async fn restart(&self) -> Result<()> {
self.stop().await?;
self.start().await
}
pub async fn request(&self, method: &str, params: Value) -> Result<Value> {
let running = self.ensure_started().await?;
running.request(method, params).await
}
pub async fn respond(&self, id: Value, result: Value) -> Result<()> {
let running = self.ensure_started().await?;
running.respond(id, result).await
}
pub async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
let running = self.ensure_started().await?;
running.respond_error(id, code, message).await
}
async fn ensure_started(&self) -> Result<Arc<RunningAppServer>> {
{
let guard = self.inner.lock().await;
if let Some(existing) = guard.as_ref().filter(|existing| existing.is_alive()) {
return Ok(Arc::clone(existing));
}
}
let _ = self.inbound_tx.send(AppServerInbound::Starting {
runtime_id: self.launch_config.runtime_id.clone(),
});
let running =
RunningAppServer::spawn(self.launch_config.clone(), self.inbound_tx.clone()).await?;
{
let mut guard = self.inner.lock().await;
*guard = Some(Arc::clone(&running));
}
let init_result = running
.request(
"initialize",
json!({
"clientInfo": {
"name": "codex-mobile-bridge",
"title": "Codex Mobile Bridge",
"version": env!("CARGO_PKG_VERSION"),
},
"capabilities": {
"experimentalApi": true
}
}),
)
.await?;
let info = parse_initialize_info(&init_result)?;
let _ = self.inbound_tx.send(AppServerInbound::Initialized {
runtime_id: self.launch_config.runtime_id.clone(),
info,
});
Ok(running)
}
}
struct RunningAppServer {
runtime_id: String,
stdin: Arc<Mutex<BufWriter<ChildStdin>>>,
child: Arc<Mutex<Option<Child>>>,
pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Value, RpcErrorPayload>>>>>,
next_id: AtomicU64,
alive: Arc<AtomicBool>,
stopping: Arc<AtomicBool>,
}
impl RunningAppServer {
async fn spawn(
launch_config: AppServerLaunchConfig,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) -> Result<Arc<Self>> {
let mut command = Command::new(&launch_config.codex_binary);
command.args(["app-server", "--listen", "stdio://"]);
command.stdin(std::process::Stdio::piped());
command.stdout(std::process::Stdio::piped());
command.stderr(std::process::Stdio::piped());
command.kill_on_drop(true);
command.env("CODEX_MOBILE_MANAGED", "1");
command.env("CODEX_MOBILE_RUNTIME_ID", &launch_config.runtime_id);
if let Some(codex_home) = launch_config.codex_home.as_ref() {
command.env("CODEX_HOME", codex_home);
}
let mut child = command
.spawn()
.with_context(|| build_spawn_error_context(&launch_config))?;
let pid = child.id();
let stdin = child.stdin.take().context("获取 app-server stdin 失败")?;
let stdout = child.stdout.take().context("获取 app-server stdout 失败")?;
let stderr = child.stderr.take().context("获取 app-server stderr 失败")?;
let running = Arc::new(Self {
runtime_id: launch_config.runtime_id.clone(),
stdin: Arc::new(Mutex::new(BufWriter::new(stdin))),
child: Arc::new(Mutex::new(Some(child))),
pending: Arc::new(Mutex::new(HashMap::new())),
next_id: AtomicU64::new(1),
alive: Arc::new(AtomicBool::new(true)),
stopping: Arc::new(AtomicBool::new(false)),
});
let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
runtime_id: launch_config.runtime_id.clone(),
pid,
running: true,
});
spawn_stdout_task(
Arc::clone(&running),
launch_config.runtime_id.clone(),
stdout,
inbound_tx.clone(),
);
spawn_stderr_task(launch_config.runtime_id.clone(), stderr, inbound_tx.clone());
spawn_wait_task(Arc::clone(&running), inbound_tx);
Ok(running)
}
fn is_alive(&self) -> bool {
self.alive.load(Ordering::SeqCst)
}
async fn stop(&self) -> Result<()> {
if !self.is_alive() {
return Ok(());
}
self.stopping.store(true, Ordering::SeqCst);
let mut child_guard = self.child.lock().await;
let Some(child) = child_guard.as_mut() else {
return Ok(());
};
child
.start_kill()
.with_context(|| format!("停止 runtime {} 失败", self.runtime_id))?;
Ok(())
}
async fn request(&self, method: &str, params: Value) -> Result<Value> {
if !self.is_alive() {
bail!("app-server 未运行");
}
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let key = id.to_string();
let (tx, rx) = oneshot::channel();
{
let mut pending = self.pending.lock().await;
pending.insert(key.clone(), tx);
}
self.send_json(json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
}))
.await?;
match timeout(Duration::from_secs(90), rx).await {
Ok(Ok(Ok(result))) => Ok(result),
Ok(Ok(Err(error))) => Err(anyhow!(error.to_string())),
Ok(Err(_)) => Err(anyhow!("等待 app-server 响应时通道关闭")),
Err(_) => {
self.pending.lock().await.remove(&key);
Err(anyhow!("等待 app-server 响应超时"))
}
}
}
async fn respond(&self, id: Value, result: Value) -> Result<()> {
self.send_json(json!({
"jsonrpc": "2.0",
"id": id,
"result": result,
}))
.await
}
async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
self.send_json(json!({
"jsonrpc": "2.0",
"id": id,
"error": {
"code": code,
"message": message,
}
}))
.await
}
async fn send_json(&self, payload: Value) -> Result<()> {
let line = serde_json::to_string(&payload)?;
let mut writer = self.stdin.lock().await;
writer.write_all(line.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await?;
Ok(())
}
}
fn spawn_stdout_task(
running: Arc<RunningAppServer>,
runtime_id: String,
stdout: tokio::process::ChildStdout,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) {
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
if line.trim().is_empty() {
continue;
}
if let Err(error) =
handle_stdout_line(&running, &runtime_id, &inbound_tx, &line).await
{
warn!("解析 app-server 输出失败: {error}");
}
}
Ok(None) => break,
Err(error) => {
warn!("读取 app-server stdout 失败: {error}");
break;
}
}
}
});
}
fn spawn_stderr_task(
runtime_id: String,
stderr: tokio::process::ChildStderr,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) {
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
let trimmed = line.trim();
if !trimmed.is_empty() {
let (level, message) = parse_app_server_stderr_line(trimmed);
warn!("app-server stderr [{runtime_id}]: {trimmed}");
let _ = inbound_tx.send(AppServerInbound::LogChunk {
runtime_id: runtime_id.clone(),
stream: "stderr".to_string(),
level,
source: "app-server".to_string(),
message,
detail: None,
occurred_at_ms: now_millis(),
});
}
}
Ok(None) => break,
Err(error) => {
warn!("读取 app-server stderr 失败: {error}");
break;
}
}
}
});
}
fn spawn_wait_task(
running: Arc<RunningAppServer>,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) {
tokio::spawn(async move {
let message = loop {
let maybe_exit = {
let mut guard = running.child.lock().await;
let Some(child) = guard.as_mut() else {
break format!("app-server 已退出: runtime {}", running.runtime_id);
};
match child.try_wait() {
Ok(Some(status)) => {
*guard = None;
Some(Ok(status))
}
Ok(None) => None,
Err(error) => {
*guard = None;
Some(Err(error))
}
}
};
match maybe_exit {
Some(Ok(status)) => break format!("app-server 已退出: {status}"),
Some(Err(error)) => break format!("等待 app-server 退出失败: {error}"),
None => sleep(Duration::from_millis(300)).await,
}
};
running.alive.store(false, Ordering::SeqCst);
fail_pending_requests(&running, &message).await;
let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
runtime_id: running.runtime_id.clone(),
pid: None,
running: false,
});
let _ = inbound_tx.send(AppServerInbound::Exited {
runtime_id: running.runtime_id.clone(),
message,
expected: running.stopping.load(Ordering::SeqCst),
});
});
}
async fn handle_stdout_line(
running: &Arc<RunningAppServer>,
runtime_id: &str,
inbound_tx: &mpsc::UnboundedSender<AppServerInbound>,
line: &str,
) -> Result<()> {
let message: Value = serde_json::from_str(line)?;
let method = message.get("method").and_then(Value::as_str);
let id = message.get("id").cloned();
let result = message.get("result").cloned();
let error = message.get("error").cloned();
match (method, id, result, error) {
(Some(method), Some(id), None, None) => {
let params = message.get("params").cloned().unwrap_or(Value::Null);
let _ = inbound_tx.send(AppServerInbound::ServerRequest {
runtime_id: runtime_id.to_string(),
id,
method: method.to_string(),
params,
});
}
(Some(method), None, None, None) => {
let params = message.get("params").cloned().unwrap_or(Value::Null);
let _ = inbound_tx.send(AppServerInbound::Notification {
runtime_id: runtime_id.to_string(),
method: method.to_string(),
params,
});
}
(None, Some(id), Some(result), _) => {
let key = json_string(&id);
if let Some(sender) = running.pending.lock().await.remove(&key) {
let _ = sender.send(Ok(result));
}
}
(None, Some(id), _, Some(error_value)) => {
let key = json_string(&id);
if let Some(sender) = running.pending.lock().await.remove(&key) {
let payload = serde_json::from_value::<RpcErrorPayload>(error_value)?;
let _ = sender.send(Err(payload));
}
}
_ => {
warn!("收到未知 app-server 消息: {line}");
}
}
Ok(())
}
async fn fail_pending_requests(running: &RunningAppServer, message: &str) {
let mut pending = running.pending.lock().await;
for (_, sender) in pending.drain() {
let _ = sender.send(Err(RpcErrorPayload {
code: -32001,
message: message.to_string(),
data: None,
}));
}
}
fn parse_initialize_info(value: &Value) -> Result<InitializeInfo> {
Ok(InitializeInfo {
user_agent: required_string(value, "userAgent")?.to_string(),
codex_home: required_string(value, "codexHome")?.to_string(),
platform_family: required_string(value, "platformFamily")?.to_string(),
platform_os: required_string(value, "platformOs")?.to_string(),
})
}
fn required_string<'a>(value: &'a Value, key: &str) -> Result<&'a str> {
value
.get(key)
.and_then(Value::as_str)
.with_context(|| format!("缺少字段 {key}"))
}
fn build_spawn_error_context(launch_config: &AppServerLaunchConfig) -> String {
let cwd = std::env::current_dir()
.map(|path| path.display().to_string())
.unwrap_or_else(|_| "<unknown>".to_string());
let path_env = std::env::var("PATH").unwrap_or_else(|_| "<unset>".to_string());
let codex_home = launch_config
.codex_home
.as_ref()
.map(|path| path.display().to_string())
.unwrap_or_else(|| "<unset>".to_string());
format!(
"启动 {} app-server 失败(runtime={} cwd={} CODEX_HOME={} PATH={})",
launch_config.codex_binary, launch_config.runtime_id, cwd, codex_home, path_env
)
}
fn parse_app_server_stderr_line(line: &str) -> (String, String) {
let normalized = line.trim().to_string();
let upper = normalized.to_uppercase();
let level = if upper.contains(" ERROR ") || upper.starts_with("ERROR ") {
"error"
} else if upper.contains(" WARN ") || upper.starts_with("WARN ") || upper.contains(" WARNING ")
{
"warn"
} else if upper.contains(" DEBUG ") || upper.starts_with("DEBUG ") {
"debug"
} else {
"info"
};
(level.to_string(), normalized)
}