use std::collections::HashMap;
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
};
use anyhow::{Context, Result, anyhow, bail};
use serde_json::{Value, json};
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::process::Command;
use tokio::sync::{Mutex, mpsc, oneshot};
use tokio::time::{Duration, timeout};
use super::handshake::build_spawn_error_context;
use super::streams::{spawn_stderr_task, spawn_stdout_task, spawn_wait_task};
use super::{AppServerInbound, AppServerLaunchConfig, RunningAppServer};
impl RunningAppServer {
pub(super) async fn spawn(
launch_config: AppServerLaunchConfig,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) -> Result<Arc<Self>> {
let mut command = Command::new(&launch_config.codex_binary);
command.args(["app-server", "--listen", "stdio://"]);
command.stdin(std::process::Stdio::piped());
command.stdout(std::process::Stdio::piped());
command.stderr(std::process::Stdio::piped());
command.kill_on_drop(true);
configure_child_process(&mut command);
command.env("CODEX_MOBILE_MANAGED", "1");
command.env("CODEX_MOBILE_RUNTIME_ID", &launch_config.runtime_id);
if let Some(codex_home) = launch_config.codex_home.as_ref() {
command.env("CODEX_HOME", codex_home);
}
let mut child = command
.spawn()
.with_context(|| build_spawn_error_context(&launch_config))?;
let pid = child.id();
let stdin = child.stdin.take().context("获取 app-server stdin 失败")?;
let stdout = child.stdout.take().context("获取 app-server stdout 失败")?;
let stderr = child.stderr.take().context("获取 app-server stderr 失败")?;
let running = Arc::new(Self {
runtime_id: launch_config.runtime_id.clone(),
stdin: Arc::new(Mutex::new(BufWriter::new(stdin))),
child: Arc::new(Mutex::new(Some(child))),
pending: Arc::new(Mutex::new(HashMap::new())),
next_id: AtomicU64::new(1),
alive: Arc::new(AtomicBool::new(true)),
stopping: Arc::new(AtomicBool::new(false)),
});
let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
runtime_id: launch_config.runtime_id.clone(),
pid,
running: true,
});
spawn_stdout_task(
Arc::clone(&running),
launch_config.runtime_id.clone(),
stdout,
inbound_tx.clone(),
);
spawn_stderr_task(launch_config.runtime_id.clone(), stderr, inbound_tx.clone());
spawn_wait_task(Arc::clone(&running), inbound_tx);
Ok(running)
}
pub(super) fn is_alive(&self) -> bool {
self.alive.load(Ordering::SeqCst)
}
pub(super) async fn stop(&self) -> Result<()> {
if !self.is_alive() {
return Ok(());
}
self.stopping.store(true, Ordering::SeqCst);
self.kill_process().await
}
pub(super) async fn abort(&self) -> Result<()> {
if !self.is_alive() {
return Ok(());
}
self.kill_process().await
}
async fn kill_process(&self) -> Result<()> {
let child_pid = {
let child_guard = self.child.lock().await;
child_guard.as_ref().and_then(|child| child.id())
};
let Some(child_pid) = child_pid else {
return Ok(());
};
#[cfg(unix)]
{
terminate_process_group(child_pid)
.with_context(|| format!("停止 runtime {} 的进程组失败", self.runtime_id))?;
if self
.wait_for_exit(Duration::from_secs(5))
.await
.unwrap_or(false)
{
return Ok(());
}
force_kill_process_group(child_pid)
.with_context(|| format!("强制终止 runtime {} 的进程组失败", self.runtime_id))?;
}
let mut child_guard = self.child.lock().await;
let Some(child) = child_guard.as_mut() else {
return Ok(());
};
child
.start_kill()
.with_context(|| format!("停止 runtime {} 失败(pid={child_pid})", self.runtime_id))?;
Ok(())
}
async fn wait_for_exit(&self, timeout_duration: Duration) -> Result<bool> {
match timeout(timeout_duration, async {
loop {
if !self.is_alive() {
return;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
{
Ok(()) => Ok(true),
Err(_) => Ok(false),
}
}
pub(super) async fn request(&self, method: &str, params: Value) -> Result<Value> {
if !self.is_alive() {
bail!("app-server 未运行");
}
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let key = id.to_string();
let (tx, rx) = oneshot::channel();
{
let mut pending = self.pending.lock().await;
pending.insert(key.clone(), tx);
}
self.send_json(json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
}))
.await?;
match timeout(Duration::from_secs(90), rx).await {
Ok(Ok(Ok(result))) => Ok(result),
Ok(Ok(Err(error))) => Err(anyhow!(error.to_string())),
Ok(Err(_)) => Err(anyhow!("等待 app-server 响应时通道关闭")),
Err(_) => {
self.pending.lock().await.remove(&key);
Err(anyhow!("等待 app-server 响应超时"))
}
}
}
pub(super) async fn respond(&self, id: Value, result: Value) -> Result<()> {
self.send_json(json!({
"jsonrpc": "2.0",
"id": id,
"result": result,
}))
.await
}
pub(super) async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
self.send_json(json!({
"jsonrpc": "2.0",
"id": id,
"error": {
"code": code,
"message": message,
}
}))
.await
}
async fn send_json(&self, payload: Value) -> Result<()> {
let line = serde_json::to_string(&payload)?;
let mut writer = self.stdin.lock().await;
writer.write_all(line.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await?;
Ok(())
}
async fn send_notification(&self, method: &str, params: Option<Value>) -> Result<()> {
let mut payload = json!({
"jsonrpc": "2.0",
"method": method,
});
if let Some(params) = params {
payload["params"] = params;
}
self.send_json(payload).await
}
pub(super) async fn notify_initialized(&self) -> Result<()> {
self.send_notification("initialized", None).await
}
}
fn configure_child_process(command: &mut Command) {
#[cfg(unix)]
{
command.process_group(0);
}
}
#[cfg(unix)]
fn terminate_process_group(child_pid: u32) -> Result<()> {
send_signal_to_process_group(child_pid, libc::SIGTERM)
}
#[cfg(unix)]
fn force_kill_process_group(child_pid: u32) -> Result<()> {
send_signal_to_process_group(child_pid, libc::SIGKILL)
}
#[cfg(unix)]
fn send_signal_to_process_group(child_pid: u32, signal: i32) -> Result<()> {
let process_group_id = -(child_pid as i32);
let result = unsafe { libc::kill(process_group_id, signal) };
if result == 0 {
return Ok(());
}
let error = std::io::Error::last_os_error();
if matches!(error.raw_os_error(), Some(libc::ESRCH)) {
return Ok(());
}
Err(error.into())
}