codex-mobile-bridge 0.3.3

Remote bridge and service manager for codex-mobile.
Documentation
use std::collections::HashMap;
use std::path::Path;

use anyhow::{Context, Result};
use semver::Version;

use super::*;
use crate::bridge_protocol::{
    BridgeManagementPhase, ManagedBridgeReleaseDescriptor, ManagedBridgeSnapshot,
    ManagedInstallRecord, ManagedServiceStatus, now_millis,
};
use crate::storage::Storage;

mod health;
mod inspect;
mod runner;

use health::{read_local_bridge_health, success_detail, wait_for_expected_health};
use runner::BridgeManagementRunner;

pub(super) const DEFAULT_MANAGEMENT_CARGO_BINARY: &str = "cargo";
pub(super) const DEFAULT_MANAGEMENT_REGISTRY: &str = "crates-io";

pub(super) fn spawn_bridge_management_task(task_id: &str, db_path: &Path) -> Result<()> {
    let current_exe = env::current_exe()
        .context("读取当前 bridge 可执行文件路径失败")?
        .canonicalize()
        .context("解析当前 bridge 可执行文件路径失败")?;
    let unit_suffix = task_id
        .chars()
        .filter(|value| value.is_ascii_alphanumeric())
        .take(12)
        .collect::<String>();
    let unit_name = if unit_suffix.is_empty() {
        "codex-mobile-bridge-manage".to_string()
    } else {
        format!("codex-mobile-bridge-manage-{unit_suffix}")
    };
    let mut command = vec![
        "systemd-run".to_string(),
        "--user".to_string(),
        "--collect".to_string(),
        "--quiet".to_string(),
        format!("--unit={unit_name}"),
    ];
    if let Ok(cwd) = env::current_dir() {
        command.push(format!("--working-directory={}", cwd.to_string_lossy()));
    }
    for key in ["HOME", "PATH", "CODEX_BINARY", "CODEX_HOME", "RUST_LOG"] {
        if let Ok(value) = env::var(key) {
            if !value.trim().is_empty() {
                command.push(format!("--setenv={key}={value}"));
            }
        }
    }
    command.extend([
        current_exe.to_string_lossy().to_string(),
        "manage".to_string(),
        "run-task".to_string(),
        "--task-id".to_string(),
        task_id.to_string(),
        "--db-path".to_string(),
        db_path.to_string_lossy().to_string(),
    ]);

    let shell_command = command
        .iter()
        .map(|value| environment::shell_quote_value(value))
        .collect::<Vec<_>>()
        .join(" ");
    environment::run_shell_capture(&shell_command)?;
    Ok(())
}

pub(super) fn run_bridge_management_task(args: RunTaskArgs) -> Result<()> {
    let storage = Storage::open_existing(args.db_path.clone())?;
    let task = storage
        .get_bridge_management_task(&args.task_id)?
        .with_context(|| format!("未找到 bridge 管理任务: {}", args.task_id))?;
    let runner = BridgeManagementRunner::new(storage, task);
    runner.run()
}

pub(super) fn inspect_remote_state(
    _db_path: &Path,
) -> Result<crate::bridge_protocol::RemoteInspectionReport> {
    inspect::inspect_remote_state()
}

pub(super) fn current_bridge_management_snapshot(_db_path: &Path) -> Result<ManagedBridgeSnapshot> {
    let paths = release::managed_paths()?;
    let mut warnings: Vec<String> = Vec::new();
    let install_record =
        environment::read_install_record(&paths)?.map(managed_install_record_from_record);
    let (service_status, service_warning) = inspect_service_status()?;
    if let Some(warning) = service_warning {
        warnings.push(warning);
    }
    let current_link_target = canonical_string_path(&paths.current_link);
    let current_binary_sha256 = current_link_target
        .as_deref()
        .map(Path::new)
        .map(read_release_binary_sha256)
        .transpose()
        .map_err(|error| anyhow::anyhow!("读取当前 release sha256 失败: {error}"))?;
    let unit_file_exists = paths.unit_file.is_file();
    let env_file_exists = paths.env_file.is_file();

    let listen_addr = managed_listen_addr(&paths)?;
    let bridge_health = match read_local_bridge_health(&listen_addr) {
        Ok(health) => Some(health),
        Err(error) => {
            if service_status.active {
                warnings.push(format!("本地 /health 检查失败: {error}"));
            }
            None
        }
    };
    let available_release = match release::resolve_latest_registry_version(
        &paths,
        DEFAULT_MANAGEMENT_CARGO_BINARY,
        DEFAULT_MANAGEMENT_REGISTRY,
    ) {
        Ok(version) => Some(ManagedBridgeReleaseDescriptor {
            crate_name: CRATE_NAME.to_string(),
            version,
            registry: DEFAULT_MANAGEMENT_REGISTRY.to_string(),
        }),
        Err(error) => {
            warnings.push(format!("查询最新 bridge release 失败: {error}"));
            None
        }
    };

    let observed_bridge_version = bridge_health
        .as_ref()
        .and_then(|snapshot| snapshot.bridge_version.clone())
        .or_else(|| {
            install_record
                .as_ref()
                .and_then(|record| record.current_version.clone())
        });
    let current_release_expected = install_record
        .as_ref()
        .and_then(|record| record.current_release_path.clone())
        .filter(|value| !value.trim().is_empty());
    let current_sha_expected = install_record
        .as_ref()
        .and_then(|record| record.current_sha256.clone())
        .filter(|value| !value.trim().is_empty());
    let can_update = available_release
        .as_ref()
        .and_then(|release| {
            observed_bridge_version
                .as_deref()
                .map(|current| is_version_newer(&release.version, current))
        })
        .unwrap_or(false);
    let needs_repair = install_record
        .as_ref()
        .is_some_and(|record| record.install_state == "installed")
        && (!service_status.installed
            || !unit_file_exists
            || !env_file_exists
            || current_link_target.is_none()
            || current_release_expected.is_none()
            || current_link_target != current_release_expected
            || matches!(
                (current_sha_expected.as_ref(), current_binary_sha256.as_ref()),
                (Some(expected), Some(observed)) if observed != expected
            ));
    let can_rollback = install_record.as_ref().is_some_and(|record| {
        record
            .previous_release_path
            .as_ref()
            .is_some_and(|value| !value.trim().is_empty())
            || record
                .previous_version
                .as_ref()
                .is_some_and(|value| !value.trim().is_empty())
    });

    Ok(ManagedBridgeSnapshot {
        available_release,
        install_record,
        service_status,
        bridge_health,
        current_link_target,
        current_binary_sha256,
        unit_file_exists,
        env_file_exists,
        can_update,
        needs_repair,
        can_rollback,
        warnings,
    })
}

fn managed_install_record_from_record(record: InstallRecord) -> ManagedInstallRecord {
    ManagedInstallRecord {
        install_state: record.install_state,
        current_artifact_id: record.current_artifact_id,
        current_version: record.current_version,
        current_build_hash: record.current_build_hash,
        current_sha256: record.current_sha256,
        current_protocol_version: record.current_protocol_version.map(|value| value as i32),
        current_release_path: record.current_release_path,
        previous_artifact_id: record.previous_artifact_id,
        previous_version: record.previous_version,
        previous_build_hash: record.previous_build_hash,
        previous_sha256: record.previous_sha256,
        previous_protocol_version: record.previous_protocol_version.map(|value| value as i32),
        previous_release_path: record.previous_release_path,
        last_operation: record.last_operation,
        last_operation_status: record.last_operation_status,
        last_operation_at_ms: record.last_operation_at_ms,
        installed_at_ms: record.installed_at_ms,
        updated_at_ms: record.updated_at_ms,
    }
}

pub(super) fn inspect_service_status() -> Result<(ManagedServiceStatus, Option<String>)> {
    let capture = environment::run_shell_capture(&format!(
        "systemctl --user show {SERVICE_NAME} --property=LoadState --property=ActiveState --property=UnitFileState --property=SubState --property=Result --property=ExecMainStatus --no-page 2>/dev/null || true"
    ))?;
    let fields = parse_key_value_lines(&capture.stdout);
    let load_state = fields.get("LoadState").cloned().unwrap_or_default();
    let active_state = fields.get("ActiveState").cloned().unwrap_or_default();
    let unit_file_state = fields.get("UnitFileState").cloned().unwrap_or_default();
    let sub_state = fields.get("SubState").cloned().unwrap_or_default();
    let result = fields.get("Result").cloned().unwrap_or_default();
    let warning = if fields.is_empty() && !capture.stderr.trim().is_empty() {
        Some(format!(
            "读取 systemd user service 状态失败: {}",
            capture.stderr.trim()
        ))
    } else {
        None
    };
    Ok((
        ManagedServiceStatus {
            installed: load_state == "loaded",
            active: active_state == "active",
            enabled: unit_file_state == "enabled",
            load_state,
            active_state,
            unit_file_state,
            sub_state,
            result,
            exec_main_status: fields
                .get("ExecMainStatus")
                .and_then(|value| value.parse::<i32>().ok()),
            checked_at_ms: now_millis(),
        },
        warning,
    ))
}

fn parse_key_value_lines(raw: &str) -> HashMap<String, String> {
    raw.lines()
        .map(str::trim)
        .filter(|line| !line.is_empty())
        .filter_map(|line| {
            let (key, value) = line.split_once('=')?;
            Some((key.to_string(), value.to_string()))
        })
        .collect()
}

pub(super) fn canonical_string_path(path: &Path) -> Option<String> {
    path.canonicalize()
        .ok()
        .map(|value| value.to_string_lossy().to_string())
}

fn read_release_binary_sha256(release_root: &Path) -> Result<String> {
    let binary_path = environment::release_binary_path(release_root)
        .with_context(|| format!("release 缺少 bridge 二进制: {}", release_root.display()))?;
    records::sha256_file(&binary_path)
}

pub(super) fn managed_listen_addr(paths: &ManagedPaths) -> Result<String> {
    let env_values = environment::read_env_file(paths)?;
    Ok(env_values
        .get("CODEX_MOBILE_LISTEN_ADDR")
        .map(|value| value.trim())
        .filter(|value| !value.is_empty())
        .unwrap_or(DEFAULT_LISTEN_ADDR)
        .to_string())
}

pub(super) fn failure_code_for_phase(phase: &BridgeManagementPhase) -> &'static str {
    match phase {
        BridgeManagementPhase::ResolveTarget => "resolve_target_failed",
        BridgeManagementPhase::InstallRelease => "install_release_failed",
        BridgeManagementPhase::ActivateRelease => "activate_release_failed",
        BridgeManagementPhase::ControlService => "control_service_failed",
        BridgeManagementPhase::CleanupManagedFiles => "cleanup_managed_files_failed",
        BridgeManagementPhase::RestartService => "restart_service_failed",
        BridgeManagementPhase::VerifyHealth => "verify_health_failed",
        BridgeManagementPhase::Done => "bridge_management_failed",
    }
}

pub(super) fn is_version_newer(candidate: &str, current: &str) -> bool {
    let Ok(candidate) = parse_semver(candidate) else {
        return false;
    };
    let Ok(current) = parse_semver(current) else {
        return false;
    };
    candidate > current
}

fn parse_semver(value: &str) -> Result<Version> {
    Version::parse(value.trim().trim_start_matches('v'))
        .with_context(|| format!("解析版本号失败: {value}"))
}