codex-mobile-bridge 0.3.10

Remote bridge and service manager for codex-mobile.
Documentation
use std::collections::HashMap;
use std::sync::{
    Arc,
    atomic::{AtomicBool, AtomicU64, Ordering},
};

use anyhow::{Context, Result, anyhow, bail};
use serde_json::{Value, json};
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::process::Command;
use tokio::sync::{Mutex, mpsc, oneshot};
use tokio::time::{Duration, timeout};

use super::handshake::build_spawn_error_context;
use super::streams::{spawn_stderr_task, spawn_stdout_task, spawn_wait_task};
use super::{AppServerInbound, AppServerLaunchConfig, RunningAppServer};

impl RunningAppServer {
    pub(super) async fn spawn(
        launch_config: AppServerLaunchConfig,
        inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
    ) -> Result<Arc<Self>> {
        let mut command = Command::new(&launch_config.codex_binary);
        command.args(["app-server", "--listen", "stdio://"]);
        command.stdin(std::process::Stdio::piped());
        command.stdout(std::process::Stdio::piped());
        command.stderr(std::process::Stdio::piped());
        command.kill_on_drop(true);
        configure_child_process(&mut command);
        command.env("CODEX_MOBILE_MANAGED", "1");
        command.env("CODEX_MOBILE_RUNTIME_ID", &launch_config.runtime_id);

        if let Some(codex_home) = launch_config.codex_home.as_ref() {
            command.env("CODEX_HOME", codex_home);
        }

        let mut child = command
            .spawn()
            .with_context(|| build_spawn_error_context(&launch_config))?;
        let pid = child.id();

        let stdin = child.stdin.take().context("获取 app-server stdin 失败")?;
        let stdout = child.stdout.take().context("获取 app-server stdout 失败")?;
        let stderr = child.stderr.take().context("获取 app-server stderr 失败")?;

        let running = Arc::new(Self {
            runtime_id: launch_config.runtime_id.clone(),
            stdin: Arc::new(Mutex::new(BufWriter::new(stdin))),
            child: Arc::new(Mutex::new(Some(child))),
            pending: Arc::new(Mutex::new(HashMap::new())),
            next_id: AtomicU64::new(1),
            alive: Arc::new(AtomicBool::new(true)),
            stopping: Arc::new(AtomicBool::new(false)),
        });

        let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
            runtime_id: launch_config.runtime_id.clone(),
            pid,
            running: true,
        });

        spawn_stdout_task(
            Arc::clone(&running),
            launch_config.runtime_id.clone(),
            stdout,
            inbound_tx.clone(),
        );
        spawn_stderr_task(launch_config.runtime_id.clone(), stderr, inbound_tx.clone());
        spawn_wait_task(Arc::clone(&running), inbound_tx);

        Ok(running)
    }

    pub(super) fn is_alive(&self) -> bool {
        self.alive.load(Ordering::SeqCst)
    }

    pub(super) async fn stop(&self) -> Result<()> {
        if !self.is_alive() {
            return Ok(());
        }

        self.stopping.store(true, Ordering::SeqCst);
        self.kill_process().await
    }

    pub(super) async fn abort(&self) -> Result<()> {
        if !self.is_alive() {
            return Ok(());
        }

        self.kill_process().await
    }

    async fn kill_process(&self) -> Result<()> {
        let child_pid = {
            let child_guard = self.child.lock().await;
            child_guard.as_ref().and_then(|child| child.id())
        };
        let Some(child_pid) = child_pid else {
            return Ok(());
        };

        #[cfg(unix)]
        {
            terminate_process_group(child_pid)
                .with_context(|| format!("停止 runtime {} 的进程组失败", self.runtime_id))?;
            if self
                .wait_for_exit(Duration::from_secs(5))
                .await
                .unwrap_or(false)
            {
                return Ok(());
            }
            force_kill_process_group(child_pid)
                .with_context(|| format!("强制终止 runtime {} 的进程组失败", self.runtime_id))?;
        }

        let mut child_guard = self.child.lock().await;
        let Some(child) = child_guard.as_mut() else {
            return Ok(());
        };
        child
            .start_kill()
            .with_context(|| format!("停止 runtime {} 失败(pid={child_pid})", self.runtime_id))?;
        Ok(())
    }

    async fn wait_for_exit(&self, timeout_duration: Duration) -> Result<bool> {
        match timeout(timeout_duration, async {
            loop {
                if !self.is_alive() {
                    return;
                }
                tokio::time::sleep(Duration::from_millis(50)).await;
            }
        })
        .await
        {
            Ok(()) => Ok(true),
            Err(_) => Ok(false),
        }
    }

    pub(super) async fn request(&self, method: &str, params: Value) -> Result<Value> {
        if !self.is_alive() {
            bail!("app-server 未运行");
        }

        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
        let key = id.to_string();
        let (tx, rx) = oneshot::channel();
        {
            let mut pending = self.pending.lock().await;
            pending.insert(key.clone(), tx);
        }

        self.send_json(json!({
            "jsonrpc": "2.0",
            "id": id,
            "method": method,
            "params": params,
        }))
        .await?;

        match timeout(Duration::from_secs(90), rx).await {
            Ok(Ok(Ok(result))) => Ok(result),
            Ok(Ok(Err(error))) => Err(anyhow!(error.to_string())),
            Ok(Err(_)) => Err(anyhow!("等待 app-server 响应时通道关闭")),
            Err(_) => {
                self.pending.lock().await.remove(&key);
                Err(anyhow!("等待 app-server 响应超时"))
            }
        }
    }

    pub(super) async fn respond(&self, id: Value, result: Value) -> Result<()> {
        self.send_json(json!({
            "jsonrpc": "2.0",
            "id": id,
            "result": result,
        }))
        .await
    }

    pub(super) async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
        self.send_json(json!({
            "jsonrpc": "2.0",
            "id": id,
            "error": {
                "code": code,
                "message": message,
            }
        }))
        .await
    }

    async fn send_json(&self, payload: Value) -> Result<()> {
        let line = serde_json::to_string(&payload)?;
        let mut writer = self.stdin.lock().await;
        writer.write_all(line.as_bytes()).await?;
        writer.write_all(b"\n").await?;
        writer.flush().await?;
        Ok(())
    }

    async fn send_notification(&self, method: &str, params: Option<Value>) -> Result<()> {
        let mut payload = json!({
            "jsonrpc": "2.0",
            "method": method,
        });
        if let Some(params) = params {
            payload["params"] = params;
        }
        self.send_json(payload).await
    }

    pub(super) async fn notify_initialized(&self) -> Result<()> {
        self.send_notification("initialized", None).await
    }
}

fn configure_child_process(command: &mut Command) {
    #[cfg(unix)]
    {
        command.process_group(0);
    }
}

#[cfg(unix)]
fn terminate_process_group(child_pid: u32) -> Result<()> {
    send_signal_to_process_group(child_pid, libc::SIGTERM)
}

#[cfg(unix)]
fn force_kill_process_group(child_pid: u32) -> Result<()> {
    send_signal_to_process_group(child_pid, libc::SIGKILL)
}

#[cfg(unix)]
fn send_signal_to_process_group(child_pid: u32, signal: i32) -> Result<()> {
    let process_group_id = -(child_pid as i32);
    let result = unsafe { libc::kill(process_group_id, signal) };
    if result == 0 {
        return Ok(());
    }

    let error = std::io::Error::last_os_error();
    if matches!(error.raw_os_error(), Some(libc::ESRCH)) {
        return Ok(());
    }
    Err(error.into())
}