codex-mobile-bridge 0.3.3

Remote bridge and service manager for codex-mobile.
Documentation
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;

use tempfile::tempdir;
use tokio::sync::mpsc;
use tokio::time::{Duration, timeout};

use super::{
    APP_SERVER_EXPERIMENTAL_API_ENABLED, AppServerInbound, AppServerLaunchConfig, AppServerManager,
};
use crate::app_server::handshake::default_opt_out_notification_methods;

#[tokio::test]
async fn start_sends_initialize_initialized_and_opt_out_notifications() {
    let temp_dir = tempdir().expect("创建临时目录失败");
    let log_path = temp_dir.path().join("received.json");
    let script_path = temp_dir.path().join("fake-codex");
    fs::write(
        &script_path,
        format!(
            r#"#!/usr/bin/env python3
import json
import pathlib
import sys

messages = []
log_path = pathlib.Path({log_path:?})
for raw_line in sys.stdin:
    line = raw_line.strip()
    if not line:
        continue
    message = json.loads(line)
    messages.append(message)
    if message.get("method") == "initialize":
        print(json.dumps({{
            "jsonrpc": "2.0",
            "id": message["id"],
            "result": {{
                "userAgent": "codex-test",
                "codexHome": "/tmp/codex-home",
                "platformFamily": "unix",
                "platformOs": "linux"
            }}
        }}), flush=True)
    elif message.get("method") == "initialized":
        log_path.write_text(json.dumps(messages))
        break
"#,
            log_path = log_path.display().to_string(),
        ),
    )
    .expect("写入 fake codex 脚本失败");
    let mut permissions = fs::metadata(&script_path)
        .expect("读取脚本权限失败")
        .permissions();
    permissions.set_mode(0o755);
    fs::set_permissions(&script_path, permissions).expect("设置脚本权限失败");

    let (inbound_tx, mut inbound_rx) = mpsc::unbounded_channel();
    let manager = AppServerManager::new(
        AppServerLaunchConfig {
            runtime_id: "primary".to_string(),
            codex_binary: script_path.display().to_string(),
            codex_home: None,
        },
        inbound_tx,
    );

    manager.start().await.expect("启动 app-server manager 失败");

    let initialized = timeout(Duration::from_secs(5), async {
        loop {
            match inbound_rx.recv().await {
                Some(AppServerInbound::Initialized {
                    experimental_api_enabled,
                    opt_out_notification_methods,
                    ..
                }) => break (experimental_api_enabled, opt_out_notification_methods),
                Some(_) => {}
                None => panic!("inbound channel 意外关闭"),
            }
        }
    })
    .await
    .expect("等待 Initialized 事件超时");

    assert!(initialized.0);
    assert_eq!(initialized.1, default_opt_out_notification_methods());

    let recorded = timeout(Duration::from_secs(5), async {
        loop {
            if log_path.exists() {
                break fs::read_to_string(&log_path).expect("读取记录文件失败");
            }
            tokio::time::sleep(Duration::from_millis(50)).await;
        }
    })
    .await
    .expect("等待 fake codex 记录消息超时");
    let messages: serde_json::Value = serde_json::from_str(&recorded).expect("解析记录消息失败");
    let entries = messages.as_array().expect("记录消息应为数组");

    assert_eq!(entries.len(), 2);
    assert_eq!(entries[0]["method"], "initialize");
    assert_eq!(
        entries[0]["params"]["capabilities"]["experimentalApi"],
        APP_SERVER_EXPERIMENTAL_API_ENABLED
    );
    assert_eq!(
        entries[0]["params"]["capabilities"]["optOutNotificationMethods"],
        serde_json::to_value(default_opt_out_notification_methods())
            .expect("序列化 opt-out 列表失败")
    );
    assert_eq!(entries[1]["method"], "initialized");
}

#[tokio::test]
async fn stop_kills_spawned_process_group() {
    let temp_dir = tempdir().expect("创建临时目录失败");
    let child_pid_path = temp_dir.path().join("child.pid");
    let script_path = temp_dir.path().join("fake-codex");
    fs::write(
        &script_path,
        format!(
            r#"#!/usr/bin/env python3
import json
import pathlib
import subprocess
import sys
import time

child_pid_path = pathlib.Path({child_pid_path:?})
child = subprocess.Popen(["sleep", "60"])
child_pid_path.write_text(str(child.pid))

for raw_line in sys.stdin:
    line = raw_line.strip()
    if not line:
        continue
    message = json.loads(line)
    if message.get("method") == "initialize":
        print(json.dumps({{
            "jsonrpc": "2.0",
            "id": message["id"],
            "result": {{
                "userAgent": "codex-test",
                "codexHome": "/tmp/codex-home",
                "platformFamily": "unix",
                "platformOs": "linux"
            }}
        }}), flush=True)
    elif message.get("method") == "initialized":
        while True:
            time.sleep(1)
"#,
            child_pid_path = child_pid_path.display().to_string(),
        ),
    )
    .expect("写入 fake codex 脚本失败");
    let mut permissions = fs::metadata(&script_path)
        .expect("读取脚本权限失败")
        .permissions();
    permissions.set_mode(0o755);
    fs::set_permissions(&script_path, permissions).expect("设置脚本权限失败");

    let (inbound_tx, _inbound_rx) = mpsc::unbounded_channel();
    let manager = AppServerManager::new(
        AppServerLaunchConfig {
            runtime_id: "primary".to_string(),
            codex_binary: script_path.display().to_string(),
            codex_home: None,
        },
        inbound_tx,
    );

    manager.start().await.expect("启动 app-server manager 失败");
    let child_pid = wait_for_child_pid(&child_pid_path).await;
    assert!(process_exists(child_pid));

    manager.stop().await.expect("停止 app-server manager 失败");

    timeout(Duration::from_secs(5), async {
        loop {
            if !process_exists(child_pid) {
                return;
            }
            tokio::time::sleep(Duration::from_millis(50)).await;
        }
    })
    .await
    .expect("等待子进程退出超时");
}

async fn wait_for_child_pid(path: &Path) -> u32 {
    timeout(Duration::from_secs(5), async {
        loop {
            if let Ok(content) = fs::read_to_string(path) {
                if let Ok(pid) = content.trim().parse::<u32>() {
                    return pid;
                }
            }
            tokio::time::sleep(Duration::from_millis(50)).await;
        }
    })
    .await
    .expect("等待 fake codex 子进程 pid 超时")
}

fn process_exists(pid: u32) -> bool {
    let result = unsafe { libc::kill(pid as i32, 0) };
    if result == 0 {
        return true;
    }

    !matches!(
        std::io::Error::last_os_error().raw_os_error(),
        Some(libc::ESRCH)
    )
}