use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use tempfile::tempdir;
use tokio::sync::mpsc;
use tokio::time::{Duration, timeout};
use super::{
APP_SERVER_EXPERIMENTAL_API_ENABLED, AppServerInbound, AppServerLaunchConfig, AppServerManager,
};
use crate::app_server::handshake::default_opt_out_notification_methods;
#[tokio::test]
async fn start_sends_initialize_initialized_and_opt_out_notifications() {
let temp_dir = tempdir().expect("创建临时目录失败");
let log_path = temp_dir.path().join("received.json");
let script_path = temp_dir.path().join("fake-codex");
fs::write(
&script_path,
format!(
r#"#!/usr/bin/env python3
import json
import pathlib
import sys
messages = []
log_path = pathlib.Path({log_path:?})
for raw_line in sys.stdin:
line = raw_line.strip()
if not line:
continue
message = json.loads(line)
messages.append(message)
if message.get("method") == "initialize":
print(json.dumps({{
"jsonrpc": "2.0",
"id": message["id"],
"result": {{
"userAgent": "codex-test",
"codexHome": "/tmp/codex-home",
"platformFamily": "unix",
"platformOs": "linux"
}}
}}), flush=True)
elif message.get("method") == "initialized":
log_path.write_text(json.dumps(messages))
break
"#,
log_path = log_path.display().to_string(),
),
)
.expect("写入 fake codex 脚本失败");
let mut permissions = fs::metadata(&script_path)
.expect("读取脚本权限失败")
.permissions();
permissions.set_mode(0o755);
fs::set_permissions(&script_path, permissions).expect("设置脚本权限失败");
let (inbound_tx, mut inbound_rx) = mpsc::unbounded_channel();
let manager = AppServerManager::new(
AppServerLaunchConfig {
runtime_id: "primary".to_string(),
codex_binary: script_path.display().to_string(),
codex_home: None,
},
inbound_tx,
);
manager.start().await.expect("启动 app-server manager 失败");
let initialized = timeout(Duration::from_secs(5), async {
loop {
match inbound_rx.recv().await {
Some(AppServerInbound::Initialized {
experimental_api_enabled,
opt_out_notification_methods,
..
}) => break (experimental_api_enabled, opt_out_notification_methods),
Some(_) => {}
None => panic!("inbound channel 意外关闭"),
}
}
})
.await
.expect("等待 Initialized 事件超时");
assert!(initialized.0);
assert_eq!(initialized.1, default_opt_out_notification_methods());
let messages = timeout(Duration::from_secs(5), async {
loop {
if log_path.exists() {
let recorded = fs::read_to_string(&log_path).expect("读取记录文件失败");
if let Ok(messages) = serde_json::from_str::<serde_json::Value>(&recorded) {
break messages;
}
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("等待 fake codex 记录消息超时");
let entries = messages.as_array().expect("记录消息应为数组");
assert_eq!(entries.len(), 2);
assert_eq!(entries[0]["method"], "initialize");
assert_eq!(
entries[0]["params"]["capabilities"]["experimentalApi"],
APP_SERVER_EXPERIMENTAL_API_ENABLED
);
assert_eq!(
entries[0]["params"]["capabilities"]["optOutNotificationMethods"],
serde_json::to_value(default_opt_out_notification_methods())
.expect("序列化 opt-out 列表失败")
);
assert_eq!(entries[1]["method"], "initialized");
}
#[tokio::test]
async fn stop_kills_spawned_process_group() {
let temp_dir = tempdir().expect("创建临时目录失败");
let child_pid_path = temp_dir.path().join("child.pid");
let script_path = temp_dir.path().join("fake-codex");
fs::write(
&script_path,
format!(
r#"#!/usr/bin/env python3
import json
import pathlib
import subprocess
import sys
import time
child_pid_path = pathlib.Path({child_pid_path:?})
child = subprocess.Popen(["sleep", "60"])
child_pid_path.write_text(str(child.pid))
for raw_line in sys.stdin:
line = raw_line.strip()
if not line:
continue
message = json.loads(line)
if message.get("method") == "initialize":
print(json.dumps({{
"jsonrpc": "2.0",
"id": message["id"],
"result": {{
"userAgent": "codex-test",
"codexHome": "/tmp/codex-home",
"platformFamily": "unix",
"platformOs": "linux"
}}
}}), flush=True)
elif message.get("method") == "initialized":
while True:
time.sleep(1)
"#,
child_pid_path = child_pid_path.display().to_string(),
),
)
.expect("写入 fake codex 脚本失败");
let mut permissions = fs::metadata(&script_path)
.expect("读取脚本权限失败")
.permissions();
permissions.set_mode(0o755);
fs::set_permissions(&script_path, permissions).expect("设置脚本权限失败");
let (inbound_tx, _inbound_rx) = mpsc::unbounded_channel();
let manager = AppServerManager::new(
AppServerLaunchConfig {
runtime_id: "primary".to_string(),
codex_binary: script_path.display().to_string(),
codex_home: None,
},
inbound_tx,
);
manager.start().await.expect("启动 app-server manager 失败");
let child_pid = wait_for_child_pid(&child_pid_path).await;
assert!(process_exists(child_pid));
manager.stop().await.expect("停止 app-server manager 失败");
timeout(Duration::from_secs(5), async {
loop {
if !process_exists(child_pid) {
return;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("等待子进程退出超时");
}
async fn wait_for_child_pid(path: &Path) -> u32 {
timeout(Duration::from_secs(5), async {
loop {
if let Ok(content) = fs::read_to_string(path) {
if let Ok(pid) = content.trim().parse::<u32>() {
return pid;
}
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("等待 fake codex 子进程 pid 超时")
}
fn process_exists(pid: u32) -> bool {
let result = unsafe { libc::kill(pid as i32, 0) };
if result == 0 {
return true;
}
!matches!(
std::io::Error::last_os_error().raw_os_error(),
Some(libc::ESRCH)
)
}