codex-mobile-bridge 0.3.10

Remote bridge and service manager for codex-mobile.
Documentation
use std::sync::{Arc, atomic::Ordering};

use anyhow::Result;
use serde_json::Value;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::sync::mpsc;
use tokio::time::sleep;
use tracing::warn;

use super::handshake::parse_app_server_stderr_line;
use super::{AppServerInbound, RpcErrorPayload, RunningAppServer};
use crate::bridge_protocol::{json_string, now_millis};

pub(super) fn spawn_stdout_task(
    running: Arc<RunningAppServer>,
    runtime_id: String,
    stdout: tokio::process::ChildStdout,
    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) {
    tokio::spawn(async move {
        let mut reader = BufReader::new(stdout).lines();
        loop {
            match reader.next_line().await {
                Ok(Some(line)) => {
                    if line.trim().is_empty() {
                        continue;
                    }
                    if let Err(error) =
                        handle_stdout_line(&running, &runtime_id, &inbound_tx, &line).await
                    {
                        warn!("解析 app-server 输出失败: {error}");
                    }
                }
                Ok(None) => break,
                Err(error) => {
                    warn!("读取 app-server stdout 失败: {error}");
                    break;
                }
            }
        }
    });
}

pub(super) fn spawn_stderr_task(
    runtime_id: String,
    stderr: tokio::process::ChildStderr,
    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) {
    tokio::spawn(async move {
        let mut reader = BufReader::new(stderr).lines();
        loop {
            match reader.next_line().await {
                Ok(Some(line)) => {
                    let trimmed = line.trim();
                    if !trimmed.is_empty() {
                        let (level, message) = parse_app_server_stderr_line(trimmed);
                        warn!("app-server stderr [{runtime_id}]: {trimmed}");
                        let _ = inbound_tx.send(AppServerInbound::LogChunk {
                            runtime_id: runtime_id.clone(),
                            stream: "stderr".to_string(),
                            level,
                            source: "app-server".to_string(),
                            message,
                            detail: None,
                            occurred_at_ms: now_millis(),
                        });
                    }
                }
                Ok(None) => break,
                Err(error) => {
                    warn!("读取 app-server stderr 失败: {error}");
                    break;
                }
            }
        }
    });
}

pub(super) fn spawn_wait_task(
    running: Arc<RunningAppServer>,
    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) {
    tokio::spawn(async move {
        let message = loop {
            let maybe_exit = {
                let mut guard = running.child.lock().await;
                let Some(child) = guard.as_mut() else {
                    break format!("app-server 已退出: runtime {}", running.runtime_id);
                };
                match child.try_wait() {
                    Ok(Some(status)) => {
                        *guard = None;
                        Some(Ok(status))
                    }
                    Ok(None) => None,
                    Err(error) => {
                        *guard = None;
                        Some(Err(error))
                    }
                }
            };

            match maybe_exit {
                Some(Ok(status)) => break format!("app-server 已退出: {status}"),
                Some(Err(error)) => break format!("等待 app-server 退出失败: {error}"),
                None => sleep(tokio::time::Duration::from_millis(300)).await,
            }
        };

        running.alive.store(false, Ordering::SeqCst);
        fail_pending_requests(&running, &message).await;
        let _ = inbound_tx.send(AppServerInbound::ProcessChanged {
            runtime_id: running.runtime_id.clone(),
            pid: None,
            running: false,
        });
        let _ = inbound_tx.send(AppServerInbound::Exited {
            runtime_id: running.runtime_id.clone(),
            message,
            expected: running.stopping.load(Ordering::SeqCst),
        });
    });
}

async fn handle_stdout_line(
    running: &Arc<RunningAppServer>,
    runtime_id: &str,
    inbound_tx: &mpsc::UnboundedSender<AppServerInbound>,
    line: &str,
) -> Result<()> {
    let message: Value = serde_json::from_str(line)?;
    let method = message.get("method").and_then(Value::as_str);
    let id = message.get("id").cloned();
    let result = message.get("result").cloned();
    let error = message.get("error").cloned();

    match (method, id, result, error) {
        (Some(method), Some(id), None, None) => {
            let params = message.get("params").cloned().unwrap_or(Value::Null);
            let _ = inbound_tx.send(AppServerInbound::ServerRequest {
                runtime_id: runtime_id.to_string(),
                id,
                method: method.to_string(),
                params,
            });
        }
        (Some(method), None, None, None) => {
            let params = message.get("params").cloned().unwrap_or(Value::Null);
            let _ = inbound_tx.send(AppServerInbound::Notification {
                runtime_id: runtime_id.to_string(),
                method: method.to_string(),
                params,
            });
        }
        (None, Some(id), Some(result), _) => {
            let key = json_string(&id);
            if let Some(sender) = running.pending.lock().await.remove(&key) {
                let _ = sender.send(Ok(result));
            }
        }
        (None, Some(id), _, Some(error_value)) => {
            let key = json_string(&id);
            if let Some(sender) = running.pending.lock().await.remove(&key) {
                let payload = serde_json::from_value::<RpcErrorPayload>(error_value)?;
                let _ = sender.send(Err(payload));
            }
        }
        _ => {
            warn!("收到未知 app-server 消息: {line}");
        }
    }

    Ok(())
}

async fn fail_pending_requests(running: &RunningAppServer, message: &str) {
    let mut pending = running.pending.lock().await;
    for (_, sender) in pending.drain() {
        let _ = sender.send(Err(RpcErrorPayload {
            code: -32001,
            message: message.to_string(),
            data: None,
        }));
    }
}