codex-mobile-bridge 0.2.6

Remote bridge and service manager for codex-mobile.
Documentation
use std::sync::Arc;

use anyhow::Result;
use serde_json::json;
use tokio::sync::mpsc;

use super::helpers::handshake_summary;
use super::notifications::handle_notification;
use super::server_requests::handle_server_request;
use crate::app_server::AppServerInbound;
use crate::bridge_protocol::{RuntimeStatusSnapshot, now_millis};
use crate::state::BridgeState;

pub(super) async fn run_app_server_event_loop(
    state: Arc<BridgeState>,
    mut inbound_rx: mpsc::UnboundedReceiver<AppServerInbound>,
) {
    while let Some(message) = inbound_rx.recv().await {
        if let Err(error) = handle_app_server_message(&state, message).await {
            let _ = state.emit_event(
                "error",
                None,
                None,
                json!({
                    "message": error.to_string(),
                }),
            );
        }
    }
}

pub(super) async fn handle_app_server_message(
    state: &BridgeState,
    message: AppServerInbound,
) -> Result<()> {
    match message {
        AppServerInbound::Starting { runtime_id } => {
            state
                .transition_runtime_status(
                    &runtime_id,
                    "starting".to_string(),
                    None,
                    handshake_summary("starting", false, Vec::new(), None),
                )
                .await
        }
        AppServerInbound::ProcessChanged {
            runtime_id,
            pid,
            running,
        } => {
            state
                .emit_runtime_process_changed(&runtime_id, pid, running)
                .await
        }
        AppServerInbound::Initializing {
            runtime_id,
            experimental_api_enabled,
            opt_out_notification_methods,
        } => {
            state
                .transition_runtime_status(
                    &runtime_id,
                    "starting".to_string(),
                    None,
                    handshake_summary(
                        "initializing",
                        experimental_api_enabled,
                        opt_out_notification_methods,
                        Some("已发送 initialize,等待握手完成".to_string()),
                    ),
                )
                .await
        }
        AppServerInbound::Initialized {
            runtime_id,
            info,
            experimental_api_enabled,
            opt_out_notification_methods,
        } => {
            let current = state.require_runtime(Some(&runtime_id)).await?;
            let current_status = current.status.read().await.clone();
            state
                .emit_runtime_status(
                    &runtime_id,
                    RuntimeStatusSnapshot {
                        runtime_id: runtime_id.clone(),
                        status: "running".to_string(),
                        codex_home: Some(info.codex_home),
                        user_agent: Some(info.user_agent),
                        platform_family: Some(info.platform_family),
                        platform_os: Some(info.platform_os),
                        last_error: None,
                        pid: current_status.pid,
                        app_server_handshake: handshake_summary(
                            "ready",
                            experimental_api_enabled,
                            opt_out_notification_methods,
                            Some("握手完成,initialized 已发送".to_string()),
                        ),
                        updated_at_ms: now_millis(),
                    },
                )
                .await
        }
        AppServerInbound::HandshakeFailed {
            runtime_id,
            message,
            experimental_api_enabled,
            opt_out_notification_methods,
        } => {
            state
                .transition_runtime_status(
                    &runtime_id,
                    "error".to_string(),
                    Some(message.clone()),
                    handshake_summary(
                        "failed",
                        experimental_api_enabled,
                        opt_out_notification_methods,
                        Some(message.clone()),
                    ),
                )
                .await?;
            state.emit_runtime_degraded(&runtime_id, message).await
        }
        AppServerInbound::Exited {
            runtime_id,
            message,
            expected,
        } => {
            let current = state.require_runtime(Some(&runtime_id)).await?;
            let current_status = current.status.read().await.clone();
            let handshake = if expected {
                crate::bridge_protocol::AppServerHandshakeSummary::inactive()
            } else if current_status.app_server_handshake.state == "failed" {
                handshake_summary(
                    "failed",
                    current_status.app_server_handshake.experimental_api_enabled,
                    current_status
                        .app_server_handshake
                        .opt_out_notification_methods
                        .clone(),
                    current_status
                        .app_server_handshake
                        .detail
                        .clone()
                        .or_else(|| Some(message.clone())),
                )
            } else {
                handshake_summary(
                    "failed",
                    current_status.app_server_handshake.experimental_api_enabled,
                    current_status
                        .app_server_handshake
                        .opt_out_notification_methods
                        .clone(),
                    Some(message.clone()),
                )
            };
            state
                .emit_runtime_status(
                    &runtime_id,
                    RuntimeStatusSnapshot {
                        runtime_id: runtime_id.clone(),
                        status: if expected {
                            "stopped".to_string()
                        } else {
                            "error".to_string()
                        },
                        codex_home: current_status.codex_home,
                        user_agent: current_status.user_agent,
                        platform_family: current_status.platform_family,
                        platform_os: current_status.platform_os,
                        last_error: if expected {
                            None
                        } else if current_status.last_error.is_some() {
                            current_status.last_error
                        } else {
                            Some(message.clone())
                        },
                        pid: None,
                        app_server_handshake: handshake,
                        updated_at_ms: now_millis(),
                    },
                )
                .await?;

            if !expected {
                state.emit_runtime_degraded(&runtime_id, message).await?;
            }

            Ok(())
        }
        AppServerInbound::Notification {
            runtime_id,
            method,
            params,
        } => handle_notification(state, &runtime_id, &method, params).await,
        AppServerInbound::ServerRequest {
            runtime_id,
            id,
            method,
            params,
        } => handle_server_request(state, &runtime_id, id, &method, params).await,
        AppServerInbound::LogChunk {
            runtime_id,
            stream,
            level,
            source,
            message,
            detail,
            occurred_at_ms,
        } => state.emit_event(
            "app_server_log_chunk",
            Some(&runtime_id),
            None,
            json!({
                "runtimeId": runtime_id,
                "stream": stream,
                "level": level,
                "source": source,
                "message": message,
                "detail": detail,
                "occurredAtMs": occurred_at_ms,
            }),
        ),
    }
}