codex-mobile-bridge 0.3.2

Remote bridge and service manager for codex-mobile.
Documentation
use std::env;
use std::path::Path;

use anyhow::Result;

use super::*;
use crate::bridge_protocol::{
    BridgeHealthSnapshot, DiagnosticLogEntry, PlatformDescriptor, RemoteInspectionReport,
    RemoteInstallRecord, RemoteLogChunk, RemoteRegistryStatus, RemoteServiceStatus,
    RemoteToolchainStatus,
};

const REMOTE_LOG_LINE_LIMIT: usize = 120;

pub(super) fn inspect_remote_state() -> Result<RemoteInspectionReport> {
    let snapshot = current_bridge_management_snapshot(Path::new("."))?;
    let toolchain_status = detect_toolchain_status();
    let registry_status = build_registry_status(&snapshot);
    let service_status = inspect_remote_service_status()?;
    let log_snapshot = read_remote_log_snapshot()?;

    let mut warnings = snapshot.warnings.clone();
    if let Some(reason) = toolchain_status
        .reason
        .clone()
        .filter(|value| !value.trim().is_empty())
    {
        warnings.push(reason);
    }
    if let Some(error) = registry_status
        .error
        .clone()
        .filter(|value| !value.trim().is_empty())
    {
        warnings.push(error);
    }
    if service_status.active && snapshot.bridge_health.is_none() {
        warnings.push("Bridge 健康端点暂不可直连".to_string());
    }

    Ok(RemoteInspectionReport {
        detected_platform: PlatformDescriptor {
            os: env::consts::OS.to_string(),
            arch: env::consts::ARCH.to_string(),
            supported: cfg!(unix),
            reason: if cfg!(unix) {
                None
            } else {
                Some("当前平台暂不支持 bridge 自主管理".to_string())
            },
        },
        toolchain_status,
        registry_status,
        available_release: snapshot.available_release,
        install_record: snapshot.install_record.map(|record| RemoteInstallRecord {
            install_state: record.install_state,
            current_artifact_id: record.current_artifact_id,
            current_version: record.current_version,
            current_build_hash: record.current_build_hash,
            current_sha256: record.current_sha256,
            current_protocol_version: record.current_protocol_version,
            current_release_path: record.current_release_path,
            previous_artifact_id: record.previous_artifact_id,
            previous_version: record.previous_version,
            previous_build_hash: record.previous_build_hash,
            previous_sha256: record.previous_sha256,
            previous_protocol_version: record.previous_protocol_version,
            previous_release_path: record.previous_release_path,
            last_operation: record.last_operation,
            last_operation_status: record.last_operation_status,
            last_operation_at_ms: record.last_operation_at_ms,
            installed_at_ms: record.installed_at_ms,
            updated_at_ms: record.updated_at_ms,
        }),
        service_status,
        bridge_health: snapshot.bridge_health.map(|health| BridgeHealthSnapshot {
            ok: health.ok,
            bridge_version: health.bridge_version,
            build_hash: health.build_hash,
            protocol_version: health.protocol_version,
            runtime_count: health.runtime_count,
            primary_runtime_id: health.primary_runtime_id,
            runtime: health.runtime,
        }),
        current_link_target: snapshot.current_link_target,
        current_binary_sha256: snapshot.current_binary_sha256,
        unit_file_exists: snapshot.unit_file_exists,
        env_file_exists: snapshot.env_file_exists,
        can_update: snapshot.can_update,
        needs_repair: snapshot.needs_repair,
        can_rollback: snapshot.can_rollback,
        warnings,
        app_server_log: log_snapshot.app_server,
        bridge_service_log: log_snapshot.bridge_service,
        systemd_log: log_snapshot.systemd,
        recent_log: log_snapshot.combined,
    })
}

fn detect_toolchain_status() -> RemoteToolchainStatus {
    let cargo_path = lookup_command("cargo");
    let rustc_path = lookup_command("rustc");
    let compiler_path = ["cc", "clang", "gcc", "c++"]
        .iter()
        .find_map(|name| lookup_command(name));
    let systemd_user_available =
        environment::run_shell_capture("systemctl --user show-environment >/dev/null 2>&1").is_ok();

    let cargo_available = cargo_path.is_some();
    let rustc_available = rustc_path.is_some();
    let compiler_available = compiler_path.is_some();
    let reason =
        if cargo_available && rustc_available && compiler_available && systemd_user_available {
            None
        } else {
            Some(
                [
                    (!cargo_available).then_some("缺少 cargo"),
                    (!rustc_available).then_some("缺少 rustc"),
                    (!compiler_available).then_some("缺少 C 编译器"),
                    (!systemd_user_available).then_some("systemd --user 不可用"),
                ]
                .into_iter()
                .flatten()
                .collect::<Vec<_>>()
                .join(""),
            )
        };

    RemoteToolchainStatus {
        cargo_available,
        rustc_available,
        compiler_available,
        systemd_user_available,
        cargo_path,
        rustc_path,
        compiler_path,
        reason,
    }
}

fn lookup_command(binary: &str) -> Option<String> {
    environment::run_shell_capture(&format!("command -v {binary} 2>/dev/null || true"))
        .ok()
        .and_then(|capture| {
            let trimmed = capture.stdout.trim();
            if trimmed.is_empty() {
                None
            } else {
                Some(trimmed.to_string())
            }
        })
}

fn build_registry_status(
    snapshot: &crate::bridge_protocol::ManagedBridgeSnapshot,
) -> RemoteRegistryStatus {
    let available_release = snapshot.available_release.as_ref();
    RemoteRegistryStatus {
        crate_name: CRATE_NAME.to_string(),
        registry: DEFAULT_MANAGEMENT_REGISTRY.to_string(),
        reachable: available_release.is_some(),
        latest_version: available_release.map(|release| release.version.clone()),
        error: snapshot
            .warnings
            .iter()
            .find(|warning| warning.contains("查询最新 bridge release 失败"))
            .cloned(),
    }
}

fn inspect_remote_service_status() -> Result<RemoteServiceStatus> {
    let capture = environment::run_shell_capture(&format!(
        "systemctl --user show {SERVICE_NAME} --property=LoadState --property=ActiveState --property=UnitFileState --property=SubState --property=Result --property=FragmentPath --property=ExecMainPID --property=ExecMainStatus --no-page 2>/dev/null || true"
    ))?;
    let linger = environment::run_shell_capture(
        "loginctl show-user \"$(id -un)\" --property=Linger --value 2>/dev/null || true",
    )?;
    let fields = capture
        .stdout
        .lines()
        .map(str::trim)
        .filter(|line| !line.is_empty())
        .filter_map(|line| {
            let (key, value) = line.split_once('=')?;
            Some((key.to_string(), value.to_string()))
        })
        .collect::<HashMap<_, _>>();
    let load_state = fields.get("LoadState").cloned().unwrap_or_default();
    let active_state = fields.get("ActiveState").cloned().unwrap_or_default();
    let unit_file_state = fields.get("UnitFileState").cloned().unwrap_or_default();
    let sub_state = fields.get("SubState").cloned().unwrap_or_default();
    let result = fields.get("Result").cloned().unwrap_or_default();
    Ok(RemoteServiceStatus {
        installed: load_state == "loaded",
        active: active_state == "active",
        enabled: unit_file_state == "enabled",
        lingering_enabled: linger.stdout.trim() == "yes",
        load_state: load_state.clone(),
        active_state: active_state.clone(),
        unit_file_state: unit_file_state.clone(),
        sub_state: sub_state.clone(),
        result: result.clone(),
        fragment_path: fields
            .get("FragmentPath")
            .cloned()
            .filter(|value| !value.is_empty()),
        exec_main_pid: fields
            .get("ExecMainPID")
            .and_then(|value| value.parse::<i64>().ok()),
        exec_main_status: fields
            .get("ExecMainStatus")
            .and_then(|value| value.parse::<i32>().ok()),
        last_message: [load_state, active_state, unit_file_state, sub_state, result]
            .into_iter()
            .filter(|value| !value.trim().is_empty())
            .collect::<Vec<_>>()
            .join(" / "),
        checked_at_ms: now_millis(),
    })
}

#[derive(Default)]
struct RemoteLogSnapshot {
    combined: RemoteLogChunk,
    app_server: RemoteLogChunk,
    bridge_service: RemoteLogChunk,
    systemd: RemoteLogChunk,
}

fn read_remote_log_snapshot() -> Result<RemoteLogSnapshot> {
    let capture = environment::run_shell_capture(&format!(
        "journalctl --user -u {SERVICE_NAME} -n {REMOTE_LOG_LINE_LIMIT} --no-pager --output=short-iso 2>&1 || true"
    ))?;
    Ok(build_remote_log_snapshot(&capture.stdout, now_millis()))
}

fn build_remote_log_snapshot(raw_text: &str, collected_at_ms: i64) -> RemoteLogSnapshot {
    let lines = raw_text
        .lines()
        .map(str::trim)
        .filter(|line| !line.is_empty())
        .map(str::to_string)
        .collect::<Vec<_>>();
    let entries = lines
        .iter()
        .map(|line| parse_remote_log_entry(line, collected_at_ms))
        .collect::<Vec<_>>();
    let truncated = lines.len() >= REMOTE_LOG_LINE_LIMIT;
    let app_server_entries = entries
        .iter()
        .filter(|entry| entry.source == "app-server")
        .cloned()
        .collect::<Vec<_>>();
    let systemd_entries = entries
        .iter()
        .filter(|entry| entry.source == "systemd")
        .cloned()
        .collect::<Vec<_>>();
    let bridge_entries = entries
        .iter()
        .filter(|entry| entry.source != "app-server" && entry.source != "systemd")
        .cloned()
        .collect::<Vec<_>>();

    RemoteLogSnapshot {
        combined: build_remote_log_chunk(entries, collected_at_ms, truncated),
        app_server: build_remote_log_chunk(
            app_server_entries,
            collected_at_ms,
            truncated && !lines.is_empty(),
        ),
        bridge_service: build_remote_log_chunk(
            bridge_entries,
            collected_at_ms,
            truncated && !lines.is_empty(),
        ),
        systemd: build_remote_log_chunk(
            systemd_entries,
            collected_at_ms,
            truncated && !lines.is_empty(),
        ),
    }
}

fn build_remote_log_chunk(
    entries: Vec<DiagnosticLogEntry>,
    collected_at_ms: i64,
    truncated: bool,
) -> RemoteLogChunk {
    let text = entries
        .iter()
        .map(|entry| {
            let mut line = format!("[{}] {}", entry.level.to_uppercase(), entry.message);
            if let Some(detail) = entry
                .detail
                .as_deref()
                .filter(|value| !value.trim().is_empty())
            {
                line.push('\n');
                line.push_str(detail);
            }
            line
        })
        .collect::<Vec<_>>()
        .join("\n");
    RemoteLogChunk {
        text,
        collected_at_ms,
        entries,
        truncated,
    }
}

fn parse_remote_log_entry(line: &str, fallback_timestamp_ms: i64) -> DiagnosticLogEntry {
    let lowered = line.to_lowercase();
    let source = if line.contains("systemd[") {
        "systemd"
    } else if line.contains("app-server stderr") || line.contains("app-server stdout") {
        "app-server"
    } else if line.contains("codex-mobile-bridge[") {
        "codex-mobile-bridge"
    } else {
        "remote"
    };
    let level = if lowered.contains(" error ") || lowered.contains(" failed") {
        "error"
    } else if lowered.contains(" warn ") || lowered.contains(" timeout") {
        "warn"
    } else {
        "info"
    };
    let code = if source == "systemd" && line.contains("Started codex-mobile-bridge.service") {
        "bridge_service_started"
    } else if source == "systemd" && line.contains("Stopped codex-mobile-bridge.service") {
        "bridge_service_stopped"
    } else if source == "codex-mobile-bridge" && line.contains("listening on") {
        "bridge_listening"
    } else if source == "app-server" && line.contains("等待 app-server 响应超时") {
        "app_server_timeout"
    } else {
        ""
    };
    DiagnosticLogEntry {
        level: level.to_string(),
        source: source.to_string(),
        code: code.to_string(),
        message: line.to_string(),
        detail: None,
        detail_samples: Vec::new(),
        first_at_ms: fallback_timestamp_ms,
        last_at_ms: fallback_timestamp_ms,
        repeat_count: 1,
    }
}