1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{
4 Arc,
5 atomic::{AtomicBool, AtomicU64, Ordering},
6};
7
8use anyhow::{Context, Result, anyhow, bail};
9use serde::{Deserialize, Serialize};
10use serde_json::{Value, json};
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
12use tokio::process::{Child, ChildStdin, Command};
13use tokio::sync::{Mutex, mpsc, oneshot};
14use tokio::time::{Duration, sleep, timeout};
15use tracing::warn;
16
17use crate::bridge_protocol::{json_string, now_millis};
18
19const APP_SERVER_EXPERIMENTAL_API_ENABLED: bool = true;
20const APP_SERVER_OPT_OUT_NOTIFICATION_METHODS: &[&str] = &[
21 "account/login/completed",
22 "account/rateLimits/updated",
23 "account/updated",
24 "app/list/updated",
25 "fs/changed",
26 "fuzzyFileSearch/sessionCompleted",
27 "fuzzyFileSearch/sessionUpdated",
28 "hook/completed",
29 "hook/started",
30 "item/autoApprovalReview/completed",
31 "item/autoApprovalReview/started",
32 "item/commandExecution/terminalInteraction",
33 "mcpServer/oauthLogin/completed",
34 "mcpServer/startupStatus/updated",
35 "skills/changed",
36 "thread/compacted",
37 "thread/realtime/closed",
38 "thread/realtime/error",
39 "thread/realtime/itemAdded",
40 "thread/realtime/outputAudio/delta",
41 "thread/realtime/sdp",
42 "thread/realtime/started",
43 "thread/realtime/transcriptUpdated",
44 "windows/worldWritableWarning",
45 "windowsSandbox/setupCompleted",
46];
47
48#[derive(Debug, Clone)]
49pub struct AppServerLaunchConfig {
50 pub runtime_id: String,
51 pub codex_binary: String,
52 pub codex_home: Option<PathBuf>,
53}
54
55#[derive(Debug, Clone)]
56pub struct InitializeInfo {
57 pub user_agent: String,
58 pub codex_home: String,
59 pub platform_family: String,
60 pub platform_os: String,
61}
62
63#[derive(Debug, Clone)]
64pub enum AppServerInbound {
65 Starting {
66 runtime_id: String,
67 },
68 ProcessChanged {
69 runtime_id: String,
70 pid: Option<u32>,
71 running: bool,
72 },
73 Initializing {
74 runtime_id: String,
75 experimental_api_enabled: bool,
76 opt_out_notification_methods: Vec<String>,
77 },
78 Initialized {
79 runtime_id: String,
80 info: InitializeInfo,
81 experimental_api_enabled: bool,
82 opt_out_notification_methods: Vec<String>,
83 },
84 HandshakeFailed {
85 runtime_id: String,
86 message: String,
87 experimental_api_enabled: bool,
88 opt_out_notification_methods: Vec<String>,
89 },
90 Notification {
91 runtime_id: String,
92 method: String,
93 params: Value,
94 },
95 ServerRequest {
96 runtime_id: String,
97 id: Value,
98 method: String,
99 params: Value,
100 },
101 Exited {
102 runtime_id: String,
103 message: String,
104 expected: bool,
105 },
106 LogChunk {
107 runtime_id: String,
108 stream: String,
109 level: String,
110 source: String,
111 message: String,
112 detail: Option<Value>,
113 occurred_at_ms: i64,
114 },
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct RpcErrorPayload {
119 pub code: i64,
120 pub message: String,
121 #[serde(default)]
122 pub data: Option<Value>,
123}
124
125impl std::fmt::Display for RpcErrorPayload {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 write!(f, "[{}] {}", self.code, self.message)
128 }
129}
130
131pub struct AppServerManager {
132 launch_config: AppServerLaunchConfig,
133 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
134 inner: Mutex<Option<Arc<RunningAppServer>>>,
135}
136
137impl AppServerManager {
138 pub fn new(
139 launch_config: AppServerLaunchConfig,
140 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
141 ) -> Self {
142 Self {
143 launch_config,
144 inbound_tx,
145 inner: Mutex::new(None),
146 }
147 }
148
149 pub fn runtime_id(&self) -> &str {
150 &self.launch_config.runtime_id
151 }
152
153 pub async fn start(&self) -> Result<()> {
154 self.ensure_started().await.map(|_| ())
155 }
156
157 pub async fn stop(&self) -> Result<()> {
158 let existing = {
159 let mut guard = self.inner.lock().await;
160 guard.take()
161 };
162
163 if let Some(existing) = existing {
164 existing.stop().await?;
165 for _ in 0..30 {
166 if !existing.is_alive() {
167 break;
168 }
169 sleep(Duration::from_millis(100)).await;
170 }
171 }
172
173 Ok(())
174 }
175
176 pub async fn restart(&self) -> Result<()> {
177 self.stop().await?;
178 self.start().await
179 }
180
181 pub async fn request(&self, method: &str, params: Value) -> Result<Value> {
182 let running = self.ensure_started().await?;
183 running.request(method, params).await
184 }
185
186 pub async fn respond(&self, id: Value, result: Value) -> Result<()> {
187 let running = self.ensure_started().await?;
188 running.respond(id, result).await
189 }
190
191 pub async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
192 let running = self.ensure_started().await?;
193 running.respond_error(id, code, message).await
194 }
195
196 async fn ensure_started(&self) -> Result<Arc<RunningAppServer>> {
197 {
198 let guard = self.inner.lock().await;
199 if let Some(existing) = guard.as_ref().filter(|existing| existing.is_alive()) {
200 return Ok(Arc::clone(existing));
201 }
202 }
203
204 let opt_out_notification_methods = default_opt_out_notification_methods();
205 let _ = self.inbound_tx.send(AppServerInbound::Starting {
206 runtime_id: self.launch_config.runtime_id.clone(),
207 });
208 let running =
209 RunningAppServer::spawn(self.launch_config.clone(), self.inbound_tx.clone()).await?;
210 {
211 let mut guard = self.inner.lock().await;
212 *guard = Some(Arc::clone(&running));
213 }
214 let _ = self.inbound_tx.send(AppServerInbound::Initializing {
215 runtime_id: self.launch_config.runtime_id.clone(),
216 experimental_api_enabled: APP_SERVER_EXPERIMENTAL_API_ENABLED,
217 opt_out_notification_methods: opt_out_notification_methods.clone(),
218 });
219
220 let init_result = match running
221 .request(
222 "initialize",
223 json!({
224 "clientInfo": {
225 "name": "codex-mobile-bridge",
226 "title": "Codex Mobile Bridge",
227 "version": env!("CARGO_PKG_VERSION"),
228 },
229 "capabilities": {
230 "experimentalApi": APP_SERVER_EXPERIMENTAL_API_ENABLED,
231 "optOutNotificationMethods": opt_out_notification_methods,
232 }
233 }),
234 )
235 .await
236 {
237 Ok(result) => result,
238 Err(error) => {
239 self.abort_startup(
240 &running,
241 APP_SERVER_EXPERIMENTAL_API_ENABLED,
242 &default_opt_out_notification_methods(),
243 format!("initialize 失败: {error}"),
244 )
245 .await;
246 return Err(error);
247 }
248 };
249
250 let info = match parse_initialize_info(&init_result) {
251 Ok(info) => info,
252 Err(error) => {
253 self.abort_startup(
254 &running,
255 APP_SERVER_EXPERIMENTAL_API_ENABLED,
256 &default_opt_out_notification_methods(),
257 format!("initialize 响应解析失败: {error}"),
258 )
259 .await;
260 return Err(error);
261 }
262 };
263 if let Err(error) = running.notify_initialized().await {
264 self.abort_startup(
265 &running,
266 APP_SERVER_EXPERIMENTAL_API_ENABLED,
267 &default_opt_out_notification_methods(),
268 format!("initialized 发送失败: {error}"),
269 )
270 .await;
271 return Err(error);
272 }
273 let _ = self.inbound_tx.send(AppServerInbound::Initialized {
274 runtime_id: self.launch_config.runtime_id.clone(),
275 info,
276 experimental_api_enabled: APP_SERVER_EXPERIMENTAL_API_ENABLED,
277 opt_out_notification_methods: default_opt_out_notification_methods(),
278 });
279 Ok(running)
280 }
281
282 async fn abort_startup(
283 &self,
284 running: &Arc<RunningAppServer>,
285 experimental_api_enabled: bool,
286 opt_out_notification_methods: &[String],
287 message: String,
288 ) {
289 let _ = self.inbound_tx.send(AppServerInbound::HandshakeFailed {
290 runtime_id: self.launch_config.runtime_id.clone(),
291 message,
292 experimental_api_enabled,
293 opt_out_notification_methods: opt_out_notification_methods.to_vec(),
294 });
295 let _ = running.abort().await;
296 let mut guard = self.inner.lock().await;
297 if guard
298 .as_ref()
299 .map(|existing| Arc::ptr_eq(existing, running))
300 .unwrap_or(false)
301 {
302 *guard = None;
303 }
304 }
305}
306
307struct RunningAppServer {
308 runtime_id: String,
309 stdin: Arc<Mutex<BufWriter<ChildStdin>>>,
310 child: Arc<Mutex<Option<Child>>>,
311 pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Value, RpcErrorPayload>>>>>,
312 next_id: AtomicU64,
313 alive: Arc<AtomicBool>,
314 stopping: Arc<AtomicBool>,
315}
316
317impl RunningAppServer {
318 async fn spawn(
319 launch_config: AppServerLaunchConfig,
320 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
321 ) -> Result<Arc<Self>> {
322 let mut command = Command::new(&launch_config.codex_binary);
323 command.args(["app-server", "--listen", "stdio://"]);
324 command.stdin(std::process::Stdio::piped());
325 command.stdout(std::process::Stdio::piped());
326 command.stderr(std::process::Stdio::piped());
327 command.kill_on_drop(true);
328 command.env("CODEX_MOBILE_MANAGED", "1");
329 command.env("CODEX_MOBILE_RUNTIME_ID", &launch_config.runtime_id);
330
331 if let Some(codex_home) = launch_config.codex_home.as_ref() {
332 command.env("CODEX_HOME", codex_home);
333 }
334
335 let mut child = command
336 .spawn()
337 .with_context(|| build_spawn_error_context(&launch_config))?;
338 let pid = child.id();
339
340 let stdin = child.stdin.take().context("获取 app-server stdin 失败")?;
341 let stdout = child.stdout.take().context("获取 app-server stdout 失败")?;
342 let stderr = child.stderr.take().context("获取 app-server stderr 失败")?;
343
344 let running = Arc::new(Self {
345 runtime_id: launch_config.runtime_id.clone(),
346 stdin: Arc::new(Mutex::new(BufWriter::new(stdin))),
347 child: Arc::new(Mutex::new(Some(child))),
348 pending: Arc::new(Mutex::new(HashMap::new())),
349 next_id: AtomicU64::new(1),
350 alive: Arc::new(AtomicBool::new(true)),
351 stopping: Arc::new(AtomicBool::new(false)),
352 });
353
354 let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
355 runtime_id: launch_config.runtime_id.clone(),
356 pid,
357 running: true,
358 });
359
360 spawn_stdout_task(
361 Arc::clone(&running),
362 launch_config.runtime_id.clone(),
363 stdout,
364 inbound_tx.clone(),
365 );
366 spawn_stderr_task(launch_config.runtime_id.clone(), stderr, inbound_tx.clone());
367 spawn_wait_task(Arc::clone(&running), inbound_tx);
368
369 Ok(running)
370 }
371
372 fn is_alive(&self) -> bool {
373 self.alive.load(Ordering::SeqCst)
374 }
375
376 async fn stop(&self) -> Result<()> {
377 if !self.is_alive() {
378 return Ok(());
379 }
380
381 self.stopping.store(true, Ordering::SeqCst);
382 self.kill_process().await
383 }
384
385 async fn abort(&self) -> Result<()> {
386 if !self.is_alive() {
387 return Ok(());
388 }
389
390 self.kill_process().await
391 }
392
393 async fn kill_process(&self) -> Result<()> {
394 let mut child_guard = self.child.lock().await;
395 let Some(child) = child_guard.as_mut() else {
396 return Ok(());
397 };
398 child
399 .start_kill()
400 .with_context(|| format!("停止 runtime {} 失败", self.runtime_id))?;
401 Ok(())
402 }
403
404 async fn request(&self, method: &str, params: Value) -> Result<Value> {
405 if !self.is_alive() {
406 bail!("app-server 未运行");
407 }
408
409 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
410 let key = id.to_string();
411 let (tx, rx) = oneshot::channel();
412 {
413 let mut pending = self.pending.lock().await;
414 pending.insert(key.clone(), tx);
415 }
416
417 self.send_json(json!({
418 "jsonrpc": "2.0",
419 "id": id,
420 "method": method,
421 "params": params,
422 }))
423 .await?;
424
425 match timeout(Duration::from_secs(90), rx).await {
426 Ok(Ok(Ok(result))) => Ok(result),
427 Ok(Ok(Err(error))) => Err(anyhow!(error.to_string())),
428 Ok(Err(_)) => Err(anyhow!("等待 app-server 响应时通道关闭")),
429 Err(_) => {
430 self.pending.lock().await.remove(&key);
431 Err(anyhow!("等待 app-server 响应超时"))
432 }
433 }
434 }
435
436 async fn respond(&self, id: Value, result: Value) -> Result<()> {
437 self.send_json(json!({
438 "jsonrpc": "2.0",
439 "id": id,
440 "result": result,
441 }))
442 .await
443 }
444
445 async fn respond_error(&self, id: Value, code: i64, message: &str) -> Result<()> {
446 self.send_json(json!({
447 "jsonrpc": "2.0",
448 "id": id,
449 "error": {
450 "code": code,
451 "message": message,
452 }
453 }))
454 .await
455 }
456
457 async fn send_json(&self, payload: Value) -> Result<()> {
458 let line = serde_json::to_string(&payload)?;
459 let mut writer = self.stdin.lock().await;
460 writer.write_all(line.as_bytes()).await?;
461 writer.write_all(b"\n").await?;
462 writer.flush().await?;
463 Ok(())
464 }
465
466 async fn send_notification(&self, method: &str, params: Option<Value>) -> Result<()> {
467 let mut payload = json!({
468 "jsonrpc": "2.0",
469 "method": method,
470 });
471 if let Some(params) = params {
472 payload["params"] = params;
473 }
474 self.send_json(payload).await
475 }
476
477 async fn notify_initialized(&self) -> Result<()> {
478 self.send_notification("initialized", None).await
479 }
480}
481
482fn spawn_stdout_task(
483 running: Arc<RunningAppServer>,
484 runtime_id: String,
485 stdout: tokio::process::ChildStdout,
486 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
487) {
488 tokio::spawn(async move {
489 let mut reader = BufReader::new(stdout).lines();
490 loop {
491 match reader.next_line().await {
492 Ok(Some(line)) => {
493 if line.trim().is_empty() {
494 continue;
495 }
496 if let Err(error) =
497 handle_stdout_line(&running, &runtime_id, &inbound_tx, &line).await
498 {
499 warn!("解析 app-server 输出失败: {error}");
500 }
501 }
502 Ok(None) => break,
503 Err(error) => {
504 warn!("读取 app-server stdout 失败: {error}");
505 break;
506 }
507 }
508 }
509 });
510}
511
512fn spawn_stderr_task(
513 runtime_id: String,
514 stderr: tokio::process::ChildStderr,
515 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
516) {
517 tokio::spawn(async move {
518 let mut reader = BufReader::new(stderr).lines();
519 loop {
520 match reader.next_line().await {
521 Ok(Some(line)) => {
522 let trimmed = line.trim();
523 if !trimmed.is_empty() {
524 let (level, message) = parse_app_server_stderr_line(trimmed);
525 warn!("app-server stderr [{runtime_id}]: {trimmed}");
526 let _ = inbound_tx.send(AppServerInbound::LogChunk {
527 runtime_id: runtime_id.clone(),
528 stream: "stderr".to_string(),
529 level,
530 source: "app-server".to_string(),
531 message,
532 detail: None,
533 occurred_at_ms: now_millis(),
534 });
535 }
536 }
537 Ok(None) => break,
538 Err(error) => {
539 warn!("读取 app-server stderr 失败: {error}");
540 break;
541 }
542 }
543 }
544 });
545}
546
547fn spawn_wait_task(
548 running: Arc<RunningAppServer>,
549 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
550) {
551 tokio::spawn(async move {
552 let message = loop {
553 let maybe_exit = {
554 let mut guard = running.child.lock().await;
555 let Some(child) = guard.as_mut() else {
556 break format!("app-server 已退出: runtime {}", running.runtime_id);
557 };
558 match child.try_wait() {
559 Ok(Some(status)) => {
560 *guard = None;
561 Some(Ok(status))
562 }
563 Ok(None) => None,
564 Err(error) => {
565 *guard = None;
566 Some(Err(error))
567 }
568 }
569 };
570
571 match maybe_exit {
572 Some(Ok(status)) => break format!("app-server 已退出: {status}"),
573 Some(Err(error)) => break format!("等待 app-server 退出失败: {error}"),
574 None => sleep(Duration::from_millis(300)).await,
575 }
576 };
577
578 running.alive.store(false, Ordering::SeqCst);
579 fail_pending_requests(&running, &message).await;
580 let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
581 runtime_id: running.runtime_id.clone(),
582 pid: None,
583 running: false,
584 });
585 let _ = inbound_tx.send(AppServerInbound::Exited {
586 runtime_id: running.runtime_id.clone(),
587 message,
588 expected: running.stopping.load(Ordering::SeqCst),
589 });
590 });
591}
592
593async fn handle_stdout_line(
594 running: &Arc<RunningAppServer>,
595 runtime_id: &str,
596 inbound_tx: &mpsc::UnboundedSender<AppServerInbound>,
597 line: &str,
598) -> Result<()> {
599 let message: Value = serde_json::from_str(line)?;
600 let method = message.get("method").and_then(Value::as_str);
601 let id = message.get("id").cloned();
602 let result = message.get("result").cloned();
603 let error = message.get("error").cloned();
604
605 match (method, id, result, error) {
606 (Some(method), Some(id), None, None) => {
607 let params = message.get("params").cloned().unwrap_or(Value::Null);
608 let _ = inbound_tx.send(AppServerInbound::ServerRequest {
609 runtime_id: runtime_id.to_string(),
610 id,
611 method: method.to_string(),
612 params,
613 });
614 }
615 (Some(method), None, None, None) => {
616 let params = message.get("params").cloned().unwrap_or(Value::Null);
617 let _ = inbound_tx.send(AppServerInbound::Notification {
618 runtime_id: runtime_id.to_string(),
619 method: method.to_string(),
620 params,
621 });
622 }
623 (None, Some(id), Some(result), _) => {
624 let key = json_string(&id);
625 if let Some(sender) = running.pending.lock().await.remove(&key) {
626 let _ = sender.send(Ok(result));
627 }
628 }
629 (None, Some(id), _, Some(error_value)) => {
630 let key = json_string(&id);
631 if let Some(sender) = running.pending.lock().await.remove(&key) {
632 let payload = serde_json::from_value::<RpcErrorPayload>(error_value)?;
633 let _ = sender.send(Err(payload));
634 }
635 }
636 _ => {
637 warn!("收到未知 app-server 消息: {line}");
638 }
639 }
640
641 Ok(())
642}
643
644async fn fail_pending_requests(running: &RunningAppServer, message: &str) {
645 let mut pending = running.pending.lock().await;
646 for (_, sender) in pending.drain() {
647 let _ = sender.send(Err(RpcErrorPayload {
648 code: -32001,
649 message: message.to_string(),
650 data: None,
651 }));
652 }
653}
654
655fn parse_initialize_info(value: &Value) -> Result<InitializeInfo> {
656 Ok(InitializeInfo {
657 user_agent: required_string(value, "userAgent")?.to_string(),
658 codex_home: required_string(value, "codexHome")?.to_string(),
659 platform_family: required_string(value, "platformFamily")?.to_string(),
660 platform_os: required_string(value, "platformOs")?.to_string(),
661 })
662}
663
664fn required_string<'a>(value: &'a Value, key: &str) -> Result<&'a str> {
665 value
666 .get(key)
667 .and_then(Value::as_str)
668 .with_context(|| format!("缺少字段 {key}"))
669}
670
671fn build_spawn_error_context(launch_config: &AppServerLaunchConfig) -> String {
672 let cwd = std::env::current_dir()
673 .map(|path| path.display().to_string())
674 .unwrap_or_else(|_| "<unknown>".to_string());
675 let path_env = std::env::var("PATH").unwrap_or_else(|_| "<unset>".to_string());
676 let codex_home = launch_config
677 .codex_home
678 .as_ref()
679 .map(|path| path.display().to_string())
680 .unwrap_or_else(|| "<unset>".to_string());
681 format!(
682 "启动 {} app-server 失败(runtime={} cwd={} CODEX_HOME={} PATH={})",
683 launch_config.codex_binary, launch_config.runtime_id, cwd, codex_home, path_env
684 )
685}
686
687fn parse_app_server_stderr_line(line: &str) -> (String, String) {
688 let normalized = line.trim().to_string();
689 let upper = normalized.to_uppercase();
690 let level = if upper.contains(" ERROR ") || upper.starts_with("ERROR ") {
691 "error"
692 } else if upper.contains(" WARN ") || upper.starts_with("WARN ") || upper.contains(" WARNING ")
693 {
694 "warn"
695 } else if upper.contains(" DEBUG ") || upper.starts_with("DEBUG ") {
696 "debug"
697 } else {
698 "info"
699 };
700 (level.to_string(), normalized)
701}
702
703fn default_opt_out_notification_methods() -> Vec<String> {
704 APP_SERVER_OPT_OUT_NOTIFICATION_METHODS
705 .iter()
706 .map(|method| (*method).to_string())
707 .collect()
708}
709
710#[cfg(test)]
711mod tests {
712 use std::fs;
713 use std::os::unix::fs::PermissionsExt;
714
715 use tempfile::tempdir;
716 use tokio::sync::mpsc;
717 use tokio::time::{Duration, timeout};
718
719 use super::{
720 APP_SERVER_EXPERIMENTAL_API_ENABLED, AppServerInbound, AppServerLaunchConfig,
721 AppServerManager, default_opt_out_notification_methods,
722 };
723
724 #[tokio::test]
725 async fn start_sends_initialize_initialized_and_opt_out_notifications() {
726 let temp_dir = tempdir().expect("创建临时目录失败");
727 let log_path = temp_dir.path().join("received.json");
728 let script_path = temp_dir.path().join("fake-codex");
729 fs::write(
730 &script_path,
731 format!(
732 r#"#!/usr/bin/env python3
733import json
734import pathlib
735import sys
736
737messages = []
738log_path = pathlib.Path({log_path:?})
739for raw_line in sys.stdin:
740 line = raw_line.strip()
741 if not line:
742 continue
743 message = json.loads(line)
744 messages.append(message)
745 if message.get("method") == "initialize":
746 print(json.dumps({{
747 "jsonrpc": "2.0",
748 "id": message["id"],
749 "result": {{
750 "userAgent": "codex-test",
751 "codexHome": "/tmp/codex-home",
752 "platformFamily": "unix",
753 "platformOs": "linux"
754 }}
755 }}), flush=True)
756 elif message.get("method") == "initialized":
757 log_path.write_text(json.dumps(messages))
758 break
759"#,
760 log_path = log_path.display().to_string(),
761 ),
762 )
763 .expect("写入 fake codex 脚本失败");
764 let mut permissions = fs::metadata(&script_path)
765 .expect("读取脚本权限失败")
766 .permissions();
767 permissions.set_mode(0o755);
768 fs::set_permissions(&script_path, permissions).expect("设置脚本权限失败");
769
770 let (inbound_tx, mut inbound_rx) = mpsc::unbounded_channel();
771 let manager = AppServerManager::new(
772 AppServerLaunchConfig {
773 runtime_id: "primary".to_string(),
774 codex_binary: script_path.display().to_string(),
775 codex_home: None,
776 },
777 inbound_tx,
778 );
779
780 manager.start().await.expect("启动 app-server manager 失败");
781
782 let initialized = timeout(Duration::from_secs(5), async {
783 loop {
784 match inbound_rx.recv().await {
785 Some(AppServerInbound::Initialized {
786 experimental_api_enabled,
787 opt_out_notification_methods,
788 ..
789 }) => break (experimental_api_enabled, opt_out_notification_methods),
790 Some(_) => {}
791 None => panic!("inbound channel 意外关闭"),
792 }
793 }
794 })
795 .await
796 .expect("等待 Initialized 事件超时");
797
798 assert!(initialized.0);
799 assert_eq!(initialized.1, default_opt_out_notification_methods());
800
801 let recorded = timeout(Duration::from_secs(5), async {
802 loop {
803 if log_path.exists() {
804 break fs::read_to_string(&log_path).expect("读取记录文件失败");
805 }
806 tokio::time::sleep(Duration::from_millis(50)).await;
807 }
808 })
809 .await
810 .expect("等待 fake codex 记录消息超时");
811 let messages: serde_json::Value =
812 serde_json::from_str(&recorded).expect("解析记录消息失败");
813 let entries = messages.as_array().expect("记录消息应为数组");
814
815 assert_eq!(entries.len(), 2);
816 assert_eq!(entries[0]["method"], "initialize");
817 assert_eq!(
818 entries[0]["params"]["capabilities"]["experimentalApi"],
819 APP_SERVER_EXPERIMENTAL_API_ENABLED
820 );
821 assert_eq!(
822 entries[0]["params"]["capabilities"]["optOutNotificationMethods"],
823 serde_json::to_value(default_opt_out_notification_methods()).expect("序列化 opt-out 列表失败")
824 );
825 assert_eq!(entries[1]["method"], "initialized");
826 }
827}