codex-mobile-bridge 0.3.3

Remote bridge and service manager for codex-mobile.
Documentation
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
use std::thread;
use std::time::{Duration as StdDuration, Instant};

use anyhow::{Context, Result, bail};

use super::*;
use crate::bridge_protocol::ManagedBridgeHealthSnapshot;

const HEALTH_CHECK_TIMEOUT: StdDuration = StdDuration::from_secs(3);
const HEALTH_VERIFY_DEADLINE: StdDuration = StdDuration::from_secs(45);
const HEALTH_VERIFY_INTERVAL: StdDuration = StdDuration::from_secs(1);

pub(super) fn read_local_bridge_health(listen_addr: &str) -> Result<ManagedBridgeHealthSnapshot> {
    let mut last_error = None;
    for address in resolve_health_addresses(listen_addr)? {
        match read_health_from_socket(address) {
            Ok(health) => return Ok(health),
            Err(error) => last_error = Some(error.to_string()),
        }
    }
    bail!(
        "无法读取本地 bridge /health: {}",
        last_error.unwrap_or_else(|| "没有可用地址".to_string())
    )
}

fn resolve_health_addresses(listen_addr: &str) -> Result<Vec<SocketAddr>> {
    let normalized = normalize_health_target(listen_addr)?;
    let addresses = normalized
        .to_socket_addrs()
        .with_context(|| format!("解析 bridge 监听地址失败: {normalized}"))?
        .collect::<Vec<_>>();
    if addresses.is_empty() {
        bail!("bridge 监听地址没有可用解析结果: {normalized}");
    }
    Ok(addresses)
}

fn normalize_health_target(listen_addr: &str) -> Result<String> {
    if let Some(rest) = listen_addr.strip_prefix('[') {
        let (host, port) = rest.split_once("]:").context("解析 IPv6 监听地址失败")?;
        let host = if host == "::" { "::1" } else { host };
        return Ok(format!("[{host}]:{port}"));
    }
    let (host, port) = listen_addr.rsplit_once(':').context("监听地址缺少端口")?;
    let normalized_host = match host {
        "0.0.0.0" | "" | "*" => "127.0.0.1",
        "::" => "::1",
        value => value,
    };
    if normalized_host.contains(':') {
        Ok(format!("[{normalized_host}]:{port}"))
    } else {
        Ok(format!("{normalized_host}:{port}"))
    }
}

fn read_health_from_socket(address: SocketAddr) -> Result<ManagedBridgeHealthSnapshot> {
    let mut stream = TcpStream::connect_timeout(&address, HEALTH_CHECK_TIMEOUT)
        .with_context(|| format!("连接本地 /health 失败: {address}"))?;
    stream
        .set_read_timeout(Some(HEALTH_CHECK_TIMEOUT))
        .with_context(|| format!("设置读取超时失败: {address}"))?;
    stream
        .set_write_timeout(Some(HEALTH_CHECK_TIMEOUT))
        .with_context(|| format!("设置写入超时失败: {address}"))?;
    stream
        .write_all(b"GET /health HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
        .with_context(|| format!("写入 /health 请求失败: {address}"))?;
    let mut response = Vec::new();
    stream
        .read_to_end(&mut response)
        .with_context(|| format!("读取 /health 响应失败: {address}"))?;
    let response = String::from_utf8(response).context("解析 /health 响应失败: 非 UTF-8")?;
    let (headers, body) = response
        .split_once("\r\n\r\n")
        .context("本地 /health 响应不完整")?;
    let status_line = headers.lines().next().unwrap_or_default();
    if !status_line.contains(" 200 ") {
        bail!("本地 /health 返回非 200: {status_line}");
    }
    serde_json::from_str(body).context("解析 /health JSON 失败")
}

pub(super) fn wait_for_expected_health(
    paths: &ManagedPaths,
    expected_version: Option<&str>,
    expected_protocol: Option<i32>,
) -> Result<ManagedBridgeHealthSnapshot> {
    let listen_addr = managed_listen_addr(paths)?;
    let deadline = Instant::now() + HEALTH_VERIFY_DEADLINE;
    let mut last_error = None;
    while Instant::now() < deadline {
        match read_local_bridge_health(&listen_addr) {
            Ok(health) => {
                let version_ready = expected_version
                    .map(|value| health.bridge_version.as_deref() == Some(value))
                    .unwrap_or(true);
                let protocol_ready = expected_protocol
                    .map(|value| health.protocol_version == Some(value))
                    .unwrap_or(true);
                if health.ok && version_ready && protocol_ready {
                    return Ok(health);
                }
                last_error = Some(format!(
                    "health 已返回但尚未达到目标版本/协议: version={} protocol={}",
                    health.bridge_version.as_deref().unwrap_or("-"),
                    health
                        .protocol_version
                        .map(|value| value.to_string())
                        .unwrap_or_else(|| "-".to_string()),
                ));
            }
            Err(error) => last_error = Some(error.to_string()),
        }
        thread::sleep(HEALTH_VERIFY_INTERVAL);
    }
    bail!(
        "等待 bridge /health 就绪超时: {}",
        last_error.unwrap_or_else(|| "未知原因".to_string())
    )
}

pub(super) fn success_detail(health: &ManagedBridgeHealthSnapshot) -> String {
    format!(
        "bridgeVersion={} protocol={} runtimes={}",
        health.bridge_version.as_deref().unwrap_or("-"),
        health
            .protocol_version
            .map(|value| value.to_string())
            .unwrap_or_else(|| "-".to_string()),
        health.runtime_count,
    )
}