Skip to main content

codex_mobile_bridge/
app_server.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{
4    Arc,
5    atomic::{AtomicBool, AtomicU64, Ordering},
6};
7
8use anyhow::{Context, Result, anyhow, bail};
9use serde::{Deserialize, Serialize};
10use serde_json::{Value, json};
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
12use tokio::process::{Child, ChildStdin, Command};
13use tokio::sync::{Mutex, mpsc, oneshot};
14use tokio::time::{Duration, sleep, timeout};
15use tracing::warn;
16
17use crate::bridge_protocol::{json_string, now_millis};
18
19const APP_SERVER_EXPERIMENTAL_API_ENABLED: bool = true;
20const APP_SERVER_OPT_OUT_NOTIFICATION_METHODS: &[&str] = &[
21    "account/login/completed",
22    "account/rateLimits/updated",
23    "account/updated",
24    "app/list/updated",
25    "fs/changed",
26    "fuzzyFileSearch/sessionCompleted",
27    "fuzzyFileSearch/sessionUpdated",
28    "hook/completed",
29    "hook/started",
30    "item/autoApprovalReview/completed",
31    "item/autoApprovalReview/started",
32    "item/commandExecution/terminalInteraction",
33    "mcpServer/oauthLogin/completed",
34    "mcpServer/startupStatus/updated",
35    "skills/changed",
36    "thread/compacted",
37    "thread/realtime/closed",
38    "thread/realtime/error",
39    "thread/realtime/itemAdded",
40    "thread/realtime/outputAudio/delta",
41    "thread/realtime/sdp",
42    "thread/realtime/started",
43    "thread/realtime/transcriptUpdated",
44    "windows/worldWritableWarning",
45    "windowsSandbox/setupCompleted",
46];
47
48#[derive(Debug, Clone)]
49pub struct AppServerLaunchConfig {
50    pub runtime_id: String,
51    pub codex_binary: String,
52    pub codex_home: Option<PathBuf>,
53}
54
55#[derive(Debug, Clone)]
56pub struct InitializeInfo {
57    pub user_agent: String,
58    pub codex_home: String,
59    pub platform_family: String,
60    pub platform_os: String,
61}
62
63#[derive(Debug, Clone)]
64pub enum AppServerInbound {
65    Starting {
66        runtime_id: String,
67    },
68    ProcessChanged {
69        runtime_id: String,
70        pid: Option<u32>,
71        running: bool,
72    },
73    Initializing {
74        runtime_id: String,
75        experimental_api_enabled: bool,
76        opt_out_notification_methods: Vec<String>,
77    },
78    Initialized {
79        runtime_id: String,
80        info: InitializeInfo,
81        experimental_api_enabled: bool,
82        opt_out_notification_methods: Vec<String>,
83    },
84    HandshakeFailed {
85        runtime_id: String,
86        message: String,
87        experimental_api_enabled: bool,
88        opt_out_notification_methods: Vec<String>,
89    },
90    Notification {
91        runtime_id: String,
92        method: String,
93        params: Value,
94    },
95    ServerRequest {
96        runtime_id: String,
97        id: Value,
98        method: String,
99        params: Value,
100    },
101    Exited {
102        runtime_id: String,
103        message: String,
104        expected: bool,
105    },
106    LogChunk {
107        runtime_id: String,
108        stream: String,
109        level: String,
110        source: String,
111        message: String,
112        detail: Option<Value>,
113        occurred_at_ms: i64,
114    },
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct RpcErrorPayload {
119    pub code: i64,
120    pub message: String,
121    #[serde(default)]
122    pub data: Option<Value>,
123}
124
125impl std::fmt::Display for RpcErrorPayload {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        write!(f, "[{}] {}", self.code, self.message)
128    }
129}
130
131pub struct AppServerManager {
132    launch_config: AppServerLaunchConfig,
133    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
134    inner: Mutex<Option<Arc<RunningAppServer>>>,
135}
136
137impl AppServerManager {
138    pub fn new(
139        launch_config: AppServerLaunchConfig,
140        inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
141    ) -> Self {
142        Self {
143            launch_config,
144            inbound_tx,
145            inner: Mutex::new(None),
146        }
147    }
148
149    pub fn runtime_id(&self) -> &str {
150        &self.launch_config.runtime_id
151    }
152
153    pub async fn start(&self) -> Result<()> {
154        self.ensure_started().await.map(|_| ())
155    }
156
157    pub async fn stop(&self) -> Result<()> {
158        let existing = {
159            let mut guard = self.inner.lock().await;
160            guard.take()
161        };
162
163        if let Some(existing) = existing {
164            existing.stop().await?;
165            for _ in 0..30 {
166                if !existing.is_alive() {
167                    break;
168                }
169                sleep(Duration::from_millis(100)).await;
170            }
171        }
172
173        Ok(())
174    }
175
176    pub async fn restart(&self) -> Result<()> {
177        self.stop().await?;
178        self.start().await
179    }
180
181    pub async fn request(&self, method: &str, params: Value) -> Result<Value> {
182        let running = self.ensure_started().await?;
183        running.request(method, params).await
184    }
185
186    pub async fn respond(&self, id: Value, result: Value) -> Result<()> {
187        let running = self.ensure_started().await?;
188        running.respond(id, result).await
189    }
190
191    pub async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
192        let running = self.ensure_started().await?;
193        running.respond_error(id, code, message).await
194    }
195
196    async fn ensure_started(&self) -> Result<Arc<RunningAppServer>> {
197        {
198            let guard = self.inner.lock().await;
199            if let Some(existing) = guard.as_ref().filter(|existing| existing.is_alive()) {
200                return Ok(Arc::clone(existing));
201            }
202        }
203
204        let opt_out_notification_methods = default_opt_out_notification_methods();
205        let _ = self.inbound_tx.send(AppServerInbound::Starting {
206            runtime_id: self.launch_config.runtime_id.clone(),
207        });
208        let running =
209            RunningAppServer::spawn(self.launch_config.clone(), self.inbound_tx.clone()).await?;
210        {
211            let mut guard = self.inner.lock().await;
212            *guard = Some(Arc::clone(&running));
213        }
214        let _ = self.inbound_tx.send(AppServerInbound::Initializing {
215            runtime_id: self.launch_config.runtime_id.clone(),
216            experimental_api_enabled: APP_SERVER_EXPERIMENTAL_API_ENABLED,
217            opt_out_notification_methods: opt_out_notification_methods.clone(),
218        });
219
220        let init_result = match running
221            .request(
222                "initialize",
223                json!({
224                    "clientInfo": {
225                        "name": "codex-mobile-bridge",
226                        "title": "Codex Mobile Bridge",
227                        "version": env!("CARGO_PKG_VERSION"),
228                    },
229                    "capabilities": {
230                        "experimentalApi": APP_SERVER_EXPERIMENTAL_API_ENABLED,
231                        "optOutNotificationMethods": opt_out_notification_methods,
232                    }
233                }),
234            )
235            .await
236        {
237            Ok(result) => result,
238            Err(error) => {
239                self.abort_startup(
240                    &running,
241                    APP_SERVER_EXPERIMENTAL_API_ENABLED,
242                    &default_opt_out_notification_methods(),
243                    format!("initialize 失败: {error}"),
244                )
245                .await;
246                return Err(error);
247            }
248        };
249
250        let info = match parse_initialize_info(&init_result) {
251            Ok(info) => info,
252            Err(error) => {
253                self.abort_startup(
254                    &running,
255                    APP_SERVER_EXPERIMENTAL_API_ENABLED,
256                    &default_opt_out_notification_methods(),
257                    format!("initialize 响应解析失败: {error}"),
258                )
259                .await;
260                return Err(error);
261            }
262        };
263        if let Err(error) = running.notify_initialized().await {
264            self.abort_startup(
265                &running,
266                APP_SERVER_EXPERIMENTAL_API_ENABLED,
267                &default_opt_out_notification_methods(),
268                format!("initialized 发送失败: {error}"),
269            )
270            .await;
271            return Err(error);
272        }
273        let _ = self.inbound_tx.send(AppServerInbound::Initialized {
274            runtime_id: self.launch_config.runtime_id.clone(),
275            info,
276            experimental_api_enabled: APP_SERVER_EXPERIMENTAL_API_ENABLED,
277            opt_out_notification_methods: default_opt_out_notification_methods(),
278        });
279        Ok(running)
280    }
281
282    async fn abort_startup(
283        &self,
284        running: &Arc<RunningAppServer>,
285        experimental_api_enabled: bool,
286        opt_out_notification_methods: &[String],
287        message: String,
288    ) {
289        let _ = self.inbound_tx.send(AppServerInbound::HandshakeFailed {
290            runtime_id: self.launch_config.runtime_id.clone(),
291            message,
292            experimental_api_enabled,
293            opt_out_notification_methods: opt_out_notification_methods.to_vec(),
294        });
295        let _ = running.abort().await;
296        let mut guard = self.inner.lock().await;
297        if guard
298            .as_ref()
299            .map(|existing| Arc::ptr_eq(existing, running))
300            .unwrap_or(false)
301        {
302            *guard = None;
303        }
304    }
305}
306
307struct RunningAppServer {
308    runtime_id: String,
309    stdin: Arc<Mutex<BufWriter<ChildStdin>>>,
310    child: Arc<Mutex<Option<Child>>>,
311    pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Value, RpcErrorPayload>>>>>,
312    next_id: AtomicU64,
313    alive: Arc<AtomicBool>,
314    stopping: Arc<AtomicBool>,
315}
316
317impl RunningAppServer {
318    async fn spawn(
319        launch_config: AppServerLaunchConfig,
320        inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
321    ) -> Result<Arc<Self>> {
322        let mut command = Command::new(&launch_config.codex_binary);
323        command.args(["app-server", "--listen", "stdio://"]);
324        command.stdin(std::process::Stdio::piped());
325        command.stdout(std::process::Stdio::piped());
326        command.stderr(std::process::Stdio::piped());
327        command.kill_on_drop(true);
328        command.env("CODEX_MOBILE_MANAGED", "1");
329        command.env("CODEX_MOBILE_RUNTIME_ID", &launch_config.runtime_id);
330
331        if let Some(codex_home) = launch_config.codex_home.as_ref() {
332            command.env("CODEX_HOME", codex_home);
333        }
334
335        let mut child = command
336            .spawn()
337            .with_context(|| build_spawn_error_context(&launch_config))?;
338        let pid = child.id();
339
340        let stdin = child.stdin.take().context("获取 app-server stdin 失败")?;
341        let stdout = child.stdout.take().context("获取 app-server stdout 失败")?;
342        let stderr = child.stderr.take().context("获取 app-server stderr 失败")?;
343
344        let running = Arc::new(Self {
345            runtime_id: launch_config.runtime_id.clone(),
346            stdin: Arc::new(Mutex::new(BufWriter::new(stdin))),
347            child: Arc::new(Mutex::new(Some(child))),
348            pending: Arc::new(Mutex::new(HashMap::new())),
349            next_id: AtomicU64::new(1),
350            alive: Arc::new(AtomicBool::new(true)),
351            stopping: Arc::new(AtomicBool::new(false)),
352        });
353
354        let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
355            runtime_id: launch_config.runtime_id.clone(),
356            pid,
357            running: true,
358        });
359
360        spawn_stdout_task(
361            Arc::clone(&running),
362            launch_config.runtime_id.clone(),
363            stdout,
364            inbound_tx.clone(),
365        );
366        spawn_stderr_task(launch_config.runtime_id.clone(), stderr, inbound_tx.clone());
367        spawn_wait_task(Arc::clone(&running), inbound_tx);
368
369        Ok(running)
370    }
371
372    fn is_alive(&self) -> bool {
373        self.alive.load(Ordering::SeqCst)
374    }
375
376    async fn stop(&self) -> Result<()> {
377        if !self.is_alive() {
378            return Ok(());
379        }
380
381        self.stopping.store(true, Ordering::SeqCst);
382        self.kill_process().await
383    }
384
385    async fn abort(&self) -> Result<()> {
386        if !self.is_alive() {
387            return Ok(());
388        }
389
390        self.kill_process().await
391    }
392
393    async fn kill_process(&self) -> Result<()> {
394        let mut child_guard = self.child.lock().await;
395        let Some(child) = child_guard.as_mut() else {
396            return Ok(());
397        };
398        child
399            .start_kill()
400            .with_context(|| format!("停止 runtime {} 失败", self.runtime_id))?;
401        Ok(())
402    }
403
404    async fn request(&self, method: &str, params: Value) -> Result<Value> {
405        if !self.is_alive() {
406            bail!("app-server 未运行");
407        }
408
409        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
410        let key = id.to_string();
411        let (tx, rx) = oneshot::channel();
412        {
413            let mut pending = self.pending.lock().await;
414            pending.insert(key.clone(), tx);
415        }
416
417        self.send_json(json!({
418            "jsonrpc": "2.0",
419            "id": id,
420            "method": method,
421            "params": params,
422        }))
423        .await?;
424
425        match timeout(Duration::from_secs(90), rx).await {
426            Ok(Ok(Ok(result))) => Ok(result),
427            Ok(Ok(Err(error))) => Err(anyhow!(error.to_string())),
428            Ok(Err(_)) => Err(anyhow!("等待 app-server 响应时通道关闭")),
429            Err(_) => {
430                self.pending.lock().await.remove(&key);
431                Err(anyhow!("等待 app-server 响应超时"))
432            }
433        }
434    }
435
436    async fn respond(&self, id: Value, result: Value) -> Result<()> {
437        self.send_json(json!({
438            "jsonrpc": "2.0",
439            "id": id,
440            "result": result,
441        }))
442        .await
443    }
444
445    async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
446        self.send_json(json!({
447            "jsonrpc": "2.0",
448            "id": id,
449            "error": {
450                "code": code,
451                "message": message,
452            }
453        }))
454        .await
455    }
456
457    async fn send_json(&self, payload: Value) -> Result<()> {
458        let line = serde_json::to_string(&payload)?;
459        let mut writer = self.stdin.lock().await;
460        writer.write_all(line.as_bytes()).await?;
461        writer.write_all(b"\n").await?;
462        writer.flush().await?;
463        Ok(())
464    }
465
466    async fn send_notification(&self, method: &str, params: Option<Value>) -> Result<()> {
467        let mut payload = json!({
468            "jsonrpc": "2.0",
469            "method": method,
470        });
471        if let Some(params) = params {
472            payload["params"] = params;
473        }
474        self.send_json(payload).await
475    }
476
477    async fn notify_initialized(&self) -> Result<()> {
478        self.send_notification("initialized", None).await
479    }
480}
481
482fn spawn_stdout_task(
483    running: Arc<RunningAppServer>,
484    runtime_id: String,
485    stdout: tokio::process::ChildStdout,
486    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
487) {
488    tokio::spawn(async move {
489        let mut reader = BufReader::new(stdout).lines();
490        loop {
491            match reader.next_line().await {
492                Ok(Some(line)) => {
493                    if line.trim().is_empty() {
494                        continue;
495                    }
496                    if let Err(error) =
497                        handle_stdout_line(&running, &runtime_id, &inbound_tx, &line).await
498                    {
499                        warn!("解析 app-server 输出失败: {error}");
500                    }
501                }
502                Ok(None) => break,
503                Err(error) => {
504                    warn!("读取 app-server stdout 失败: {error}");
505                    break;
506                }
507            }
508        }
509    });
510}
511
512fn spawn_stderr_task(
513    runtime_id: String,
514    stderr: tokio::process::ChildStderr,
515    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
516) {
517    tokio::spawn(async move {
518        let mut reader = BufReader::new(stderr).lines();
519        loop {
520            match reader.next_line().await {
521                Ok(Some(line)) => {
522                    let trimmed = line.trim();
523                    if !trimmed.is_empty() {
524                        let (level, message) = parse_app_server_stderr_line(trimmed);
525                        warn!("app-server stderr [{runtime_id}]: {trimmed}");
526                        let _ = inbound_tx.send(AppServerInbound::LogChunk {
527                            runtime_id: runtime_id.clone(),
528                            stream: "stderr".to_string(),
529                            level,
530                            source: "app-server".to_string(),
531                            message,
532                            detail: None,
533                            occurred_at_ms: now_millis(),
534                        });
535                    }
536                }
537                Ok(None) => break,
538                Err(error) => {
539                    warn!("读取 app-server stderr 失败: {error}");
540                    break;
541                }
542            }
543        }
544    });
545}
546
547fn spawn_wait_task(
548    running: Arc<RunningAppServer>,
549    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
550) {
551    tokio::spawn(async move {
552        let message = loop {
553            let maybe_exit = {
554                let mut guard = running.child.lock().await;
555                let Some(child) = guard.as_mut() else {
556                    break format!("app-server 已退出: runtime {}", running.runtime_id);
557                };
558                match child.try_wait() {
559                    Ok(Some(status)) => {
560                        *guard = None;
561                        Some(Ok(status))
562                    }
563                    Ok(None) => None,
564                    Err(error) => {
565                        *guard = None;
566                        Some(Err(error))
567                    }
568                }
569            };
570
571            match maybe_exit {
572                Some(Ok(status)) => break format!("app-server 已退出: {status}"),
573                Some(Err(error)) => break format!("等待 app-server 退出失败: {error}"),
574                None => sleep(Duration::from_millis(300)).await,
575            }
576        };
577
578        running.alive.store(false, Ordering::SeqCst);
579        fail_pending_requests(&running, &message).await;
580        let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
581            runtime_id: running.runtime_id.clone(),
582            pid: None,
583            running: false,
584        });
585        let _ = inbound_tx.send(AppServerInbound::Exited {
586            runtime_id: running.runtime_id.clone(),
587            message,
588            expected: running.stopping.load(Ordering::SeqCst),
589        });
590    });
591}
592
593async fn handle_stdout_line(
594    running: &Arc<RunningAppServer>,
595    runtime_id: &str,
596    inbound_tx: &mpsc::UnboundedSender<AppServerInbound>,
597    line: &str,
598) -> Result<()> {
599    let message: Value = serde_json::from_str(line)?;
600    let method = message.get("method").and_then(Value::as_str);
601    let id = message.get("id").cloned();
602    let result = message.get("result").cloned();
603    let error = message.get("error").cloned();
604
605    match (method, id, result, error) {
606        (Some(method), Some(id), None, None) => {
607            let params = message.get("params").cloned().unwrap_or(Value::Null);
608            let _ = inbound_tx.send(AppServerInbound::ServerRequest {
609                runtime_id: runtime_id.to_string(),
610                id,
611                method: method.to_string(),
612                params,
613            });
614        }
615        (Some(method), None, None, None) => {
616            let params = message.get("params").cloned().unwrap_or(Value::Null);
617            let _ = inbound_tx.send(AppServerInbound::Notification {
618                runtime_id: runtime_id.to_string(),
619                method: method.to_string(),
620                params,
621            });
622        }
623        (None, Some(id), Some(result), _) => {
624            let key = json_string(&id);
625            if let Some(sender) = running.pending.lock().await.remove(&key) {
626                let _ = sender.send(Ok(result));
627            }
628        }
629        (None, Some(id), _, Some(error_value)) => {
630            let key = json_string(&id);
631            if let Some(sender) = running.pending.lock().await.remove(&key) {
632                let payload = serde_json::from_value::<RpcErrorPayload>(error_value)?;
633                let _ = sender.send(Err(payload));
634            }
635        }
636        _ => {
637            warn!("收到未知 app-server 消息: {line}");
638        }
639    }
640
641    Ok(())
642}
643
644async fn fail_pending_requests(running: &RunningAppServer, message: &str) {
645    let mut pending = running.pending.lock().await;
646    for (_, sender) in pending.drain() {
647        let _ = sender.send(Err(RpcErrorPayload {
648            code: -32001,
649            message: message.to_string(),
650            data: None,
651        }));
652    }
653}
654
655fn parse_initialize_info(value: &Value) -> Result<InitializeInfo> {
656    Ok(InitializeInfo {
657        user_agent: required_string(value, "userAgent")?.to_string(),
658        codex_home: required_string(value, "codexHome")?.to_string(),
659        platform_family: required_string(value, "platformFamily")?.to_string(),
660        platform_os: required_string(value, "platformOs")?.to_string(),
661    })
662}
663
664fn required_string<'a>(value: &'a Value, key: &str) -> Result<&'a str> {
665    value
666        .get(key)
667        .and_then(Value::as_str)
668        .with_context(|| format!("缺少字段 {key}"))
669}
670
671fn build_spawn_error_context(launch_config: &AppServerLaunchConfig) -> String {
672    let cwd = std::env::current_dir()
673        .map(|path| path.display().to_string())
674        .unwrap_or_else(|_| "<unknown>".to_string());
675    let path_env = std::env::var("PATH").unwrap_or_else(|_| "<unset>".to_string());
676    let codex_home = launch_config
677        .codex_home
678        .as_ref()
679        .map(|path| path.display().to_string())
680        .unwrap_or_else(|| "<unset>".to_string());
681    format!(
682        "启动 {} app-server 失败(runtime={} cwd={} CODEX_HOME={} PATH={})",
683        launch_config.codex_binary, launch_config.runtime_id, cwd, codex_home, path_env
684    )
685}
686
687fn parse_app_server_stderr_line(line: &str) -> (String, String) {
688    let normalized = line.trim().to_string();
689    let upper = normalized.to_uppercase();
690    let level = if upper.contains(" ERROR ") || upper.starts_with("ERROR ") {
691        "error"
692    } else if upper.contains(" WARN ") || upper.starts_with("WARN ") || upper.contains(" WARNING ")
693    {
694        "warn"
695    } else if upper.contains(" DEBUG ") || upper.starts_with("DEBUG ") {
696        "debug"
697    } else {
698        "info"
699    };
700    (level.to_string(), normalized)
701}
702
703fn default_opt_out_notification_methods() -> Vec<String> {
704    APP_SERVER_OPT_OUT_NOTIFICATION_METHODS
705        .iter()
706        .map(|method| (*method).to_string())
707        .collect()
708}
709
710#[cfg(test)]
711mod tests {
712    use std::fs;
713    use std::os::unix::fs::PermissionsExt;
714
715    use tempfile::tempdir;
716    use tokio::sync::mpsc;
717    use tokio::time::{Duration, timeout};
718
719    use super::{
720        APP_SERVER_EXPERIMENTAL_API_ENABLED, AppServerInbound, AppServerLaunchConfig,
721        AppServerManager, default_opt_out_notification_methods,
722    };
723
724    #[tokio::test]
725    async fn start_sends_initialize_initialized_and_opt_out_notifications() {
726        let temp_dir = tempdir().expect("创建临时目录失败");
727        let log_path = temp_dir.path().join("received.json");
728        let script_path = temp_dir.path().join("fake-codex");
729        fs::write(
730            &script_path,
731            format!(
732                r#"#!/usr/bin/env python3
733import json
734import pathlib
735import sys
736
737messages = []
738log_path = pathlib.Path({log_path:?})
739for raw_line in sys.stdin:
740    line = raw_line.strip()
741    if not line:
742        continue
743    message = json.loads(line)
744    messages.append(message)
745    if message.get("method") == "initialize":
746        print(json.dumps({{
747            "jsonrpc": "2.0",
748            "id": message["id"],
749            "result": {{
750                "userAgent": "codex-test",
751                "codexHome": "/tmp/codex-home",
752                "platformFamily": "unix",
753                "platformOs": "linux"
754            }}
755        }}), flush=True)
756    elif message.get("method") == "initialized":
757        log_path.write_text(json.dumps(messages))
758        break
759"#,
760                log_path = log_path.display().to_string(),
761            ),
762        )
763        .expect("写入 fake codex 脚本失败");
764        let mut permissions = fs::metadata(&script_path)
765            .expect("读取脚本权限失败")
766            .permissions();
767        permissions.set_mode(0o755);
768        fs::set_permissions(&script_path, permissions).expect("设置脚本权限失败");
769
770        let (inbound_tx, mut inbound_rx) = mpsc::unbounded_channel();
771        let manager = AppServerManager::new(
772            AppServerLaunchConfig {
773                runtime_id: "primary".to_string(),
774                codex_binary: script_path.display().to_string(),
775                codex_home: None,
776            },
777            inbound_tx,
778        );
779
780        manager.start().await.expect("启动 app-server manager 失败");
781
782        let initialized = timeout(Duration::from_secs(5), async {
783            loop {
784                match inbound_rx.recv().await {
785                    Some(AppServerInbound::Initialized {
786                        experimental_api_enabled,
787                        opt_out_notification_methods,
788                        ..
789                    }) => break (experimental_api_enabled, opt_out_notification_methods),
790                    Some(_) => {}
791                    None => panic!("inbound channel 意外关闭"),
792                }
793            }
794        })
795        .await
796        .expect("等待 Initialized 事件超时");
797
798        assert!(initialized.0);
799        assert_eq!(initialized.1, default_opt_out_notification_methods());
800
801        let recorded = timeout(Duration::from_secs(5), async {
802            loop {
803                if log_path.exists() {
804                    break fs::read_to_string(&log_path).expect("读取记录文件失败");
805                }
806                tokio::time::sleep(Duration::from_millis(50)).await;
807            }
808        })
809        .await
810        .expect("等待 fake codex 记录消息超时");
811        let messages: serde_json::Value =
812            serde_json::from_str(&recorded).expect("解析记录消息失败");
813        let entries = messages.as_array().expect("记录消息应为数组");
814
815        assert_eq!(entries.len(), 2);
816        assert_eq!(entries[0]["method"], "initialize");
817        assert_eq!(
818            entries[0]["params"]["capabilities"]["experimentalApi"],
819            APP_SERVER_EXPERIMENTAL_API_ENABLED
820        );
821        assert_eq!(
822            entries[0]["params"]["capabilities"]["optOutNotificationMethods"],
823            serde_json::to_value(default_opt_out_notification_methods()).expect("序列化 opt-out 列表失败")
824        );
825        assert_eq!(entries[1]["method"], "initialized");
826    }
827}