use std::collections::HashMap;
use std::path::Path;
use anyhow::{Context, Result};
use semver::Version;
use super::*;
use crate::bridge_protocol::{
BridgeManagementPhase, ManagedBridgeReleaseDescriptor, ManagedBridgeSnapshot,
ManagedInstallRecord, ManagedServiceStatus, now_millis,
};
use crate::storage::Storage;
mod health;
mod inspect;
mod runner;
use health::{read_local_bridge_health, success_detail, wait_for_expected_health};
use runner::BridgeManagementRunner;
pub(super) const DEFAULT_MANAGEMENT_CARGO_BINARY: &str = "cargo";
pub(super) const DEFAULT_MANAGEMENT_REGISTRY: &str = "crates-io";
pub(super) fn spawn_bridge_management_task(task_id: &str, db_path: &Path) -> Result<()> {
let current_exe = env::current_exe()
.context("读取当前 bridge 可执行文件路径失败")?
.canonicalize()
.context("解析当前 bridge 可执行文件路径失败")?;
let unit_suffix = task_id
.chars()
.filter(|value| value.is_ascii_alphanumeric())
.take(12)
.collect::<String>();
let unit_name = if unit_suffix.is_empty() {
"codex-mobile-bridge-manage".to_string()
} else {
format!("codex-mobile-bridge-manage-{unit_suffix}")
};
let mut command = vec![
"systemd-run".to_string(),
"--user".to_string(),
"--collect".to_string(),
"--quiet".to_string(),
format!("--unit={unit_name}"),
];
if let Ok(cwd) = env::current_dir() {
command.push(format!("--working-directory={}", cwd.to_string_lossy()));
}
for key in ["HOME", "PATH", "CODEX_BINARY", "CODEX_HOME", "RUST_LOG"] {
if let Ok(value) = env::var(key)
&& !value.trim().is_empty()
{
command.push(format!("--setenv={key}={value}"));
}
}
command.extend([
current_exe.to_string_lossy().to_string(),
"manage".to_string(),
"run-task".to_string(),
"--task-id".to_string(),
task_id.to_string(),
"--db-path".to_string(),
db_path.to_string_lossy().to_string(),
]);
let shell_command = command
.iter()
.map(|value| environment::shell_quote_value(value))
.collect::<Vec<_>>()
.join(" ");
environment::run_shell_capture(&shell_command)?;
Ok(())
}
pub(super) fn run_bridge_management_task(args: RunTaskArgs) -> Result<()> {
let storage = Storage::open_existing(args.db_path.clone())?;
let task = storage
.get_bridge_management_task(&args.task_id)?
.with_context(|| format!("未找到 bridge 管理任务: {}", args.task_id))?;
let runner = BridgeManagementRunner::new(storage, task);
runner.run()
}
pub(super) fn inspect_remote_state(
_db_path: &Path,
) -> Result<crate::bridge_protocol::RemoteInspectionReport> {
inspect::inspect_remote_state()
}
pub(super) fn current_bridge_management_snapshot(_db_path: &Path) -> Result<ManagedBridgeSnapshot> {
let paths = release::managed_paths()?;
let mut warnings: Vec<String> = Vec::new();
let install_record =
environment::read_install_record(&paths)?.map(managed_install_record_from_record);
let (service_status, service_warning) = inspect_service_status()?;
if let Some(warning) = service_warning {
warnings.push(warning);
}
let current_link_target = canonical_string_path(&paths.current_link);
let current_binary_sha256 = current_link_target
.as_deref()
.map(Path::new)
.map(read_release_binary_sha256)
.transpose()
.map_err(|error| anyhow::anyhow!("读取当前 release sha256 失败: {error}"))?;
let unit_file_exists = paths.unit_file.is_file();
let env_file_exists = paths.env_file.is_file();
let listen_addr = managed_listen_addr(&paths)?;
let bridge_health = match read_local_bridge_health(&listen_addr) {
Ok(health) => Some(health),
Err(error) => {
if service_status.active {
warnings.push(format!("本地 /health 检查失败: {error}"));
}
None
}
};
let available_release = match release::resolve_latest_registry_version(
&paths,
DEFAULT_MANAGEMENT_CARGO_BINARY,
DEFAULT_MANAGEMENT_REGISTRY,
) {
Ok(version) => Some(ManagedBridgeReleaseDescriptor {
crate_name: CRATE_NAME.to_string(),
version,
registry: DEFAULT_MANAGEMENT_REGISTRY.to_string(),
}),
Err(error) => {
warnings.push(format!("查询最新 bridge release 失败: {error}"));
None
}
};
let observed_bridge_version = bridge_health
.as_ref()
.and_then(|snapshot| snapshot.bridge_version.clone())
.or_else(|| {
install_record
.as_ref()
.and_then(|record| record.current_version.clone())
});
let current_release_expected = install_record
.as_ref()
.and_then(|record| record.current_release_path.clone())
.filter(|value| !value.trim().is_empty());
let current_sha_expected = install_record
.as_ref()
.and_then(|record| record.current_sha256.clone())
.filter(|value| !value.trim().is_empty());
let can_update = available_release
.as_ref()
.and_then(|release| {
observed_bridge_version
.as_deref()
.map(|current| is_version_newer(&release.version, current))
})
.unwrap_or(false);
let needs_repair = install_record
.as_ref()
.is_some_and(|record| record.install_state == "installed")
&& (!service_status.installed
|| !unit_file_exists
|| !env_file_exists
|| current_link_target.is_none()
|| current_release_expected.is_none()
|| current_link_target != current_release_expected
|| matches!(
(current_sha_expected.as_ref(), current_binary_sha256.as_ref()),
(Some(expected), Some(observed)) if observed != expected
));
let can_rollback = install_record.as_ref().is_some_and(|record| {
record
.previous_release_path
.as_ref()
.is_some_and(|value| !value.trim().is_empty())
|| record
.previous_version
.as_ref()
.is_some_and(|value| !value.trim().is_empty())
});
Ok(ManagedBridgeSnapshot {
available_release,
install_record,
service_status,
bridge_health,
current_link_target,
current_binary_sha256,
unit_file_exists,
env_file_exists,
can_update,
needs_repair,
can_rollback,
warnings,
})
}
fn managed_install_record_from_record(record: InstallRecord) -> ManagedInstallRecord {
ManagedInstallRecord {
install_state: record.install_state,
current_artifact_id: record.current_artifact_id,
current_version: record.current_version,
current_build_hash: record.current_build_hash,
current_sha256: record.current_sha256,
current_protocol_version: record.current_protocol_version.map(|value| value as i32),
current_release_path: record.current_release_path,
previous_artifact_id: record.previous_artifact_id,
previous_version: record.previous_version,
previous_build_hash: record.previous_build_hash,
previous_sha256: record.previous_sha256,
previous_protocol_version: record.previous_protocol_version.map(|value| value as i32),
previous_release_path: record.previous_release_path,
last_operation: record.last_operation,
last_operation_status: record.last_operation_status,
last_operation_at_ms: record.last_operation_at_ms,
installed_at_ms: record.installed_at_ms,
updated_at_ms: record.updated_at_ms,
}
}
pub(super) fn inspect_service_status() -> Result<(ManagedServiceStatus, Option<String>)> {
let capture = environment::run_shell_capture(&format!(
"systemctl --user show {SERVICE_NAME} --property=LoadState --property=ActiveState --property=UnitFileState --property=SubState --property=Result --property=ExecMainStatus --no-page 2>/dev/null || true"
))?;
let fields = parse_key_value_lines(&capture.stdout);
let load_state = fields.get("LoadState").cloned().unwrap_or_default();
let active_state = fields.get("ActiveState").cloned().unwrap_or_default();
let unit_file_state = fields.get("UnitFileState").cloned().unwrap_or_default();
let sub_state = fields.get("SubState").cloned().unwrap_or_default();
let result = fields.get("Result").cloned().unwrap_or_default();
let warning = if fields.is_empty() && !capture.stderr.trim().is_empty() {
Some(format!(
"读取 systemd user service 状态失败: {}",
capture.stderr.trim()
))
} else {
None
};
Ok((
ManagedServiceStatus {
installed: load_state == "loaded",
active: active_state == "active",
enabled: unit_file_state == "enabled",
load_state,
active_state,
unit_file_state,
sub_state,
result,
exec_main_status: fields
.get("ExecMainStatus")
.and_then(|value| value.parse::<i32>().ok()),
checked_at_ms: now_millis(),
},
warning,
))
}
fn parse_key_value_lines(raw: &str) -> HashMap<String, String> {
raw.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.filter_map(|line| {
let (key, value) = line.split_once('=')?;
Some((key.to_string(), value.to_string()))
})
.collect()
}
pub(super) fn canonical_string_path(path: &Path) -> Option<String> {
path.canonicalize()
.ok()
.map(|value| value.to_string_lossy().to_string())
}
fn read_release_binary_sha256(release_root: &Path) -> Result<String> {
let binary_path = environment::release_binary_path(release_root)
.with_context(|| format!("release 缺少 bridge 二进制: {}", release_root.display()))?;
records::sha256_file(&binary_path)
}
pub(super) fn managed_listen_addr(paths: &ManagedPaths) -> Result<String> {
let env_values = environment::read_env_file(paths)?;
Ok(env_values
.get("CODEX_MOBILE_LISTEN_ADDR")
.map(|value| value.trim())
.filter(|value| !value.is_empty())
.unwrap_or(DEFAULT_LISTEN_ADDR)
.to_string())
}
pub(super) fn failure_code_for_phase(phase: &BridgeManagementPhase) -> &'static str {
match phase {
BridgeManagementPhase::ResolveTarget => "resolve_target_failed",
BridgeManagementPhase::InstallRelease => "install_release_failed",
BridgeManagementPhase::ActivateRelease => "activate_release_failed",
BridgeManagementPhase::ControlService => "control_service_failed",
BridgeManagementPhase::CleanupManagedFiles => "cleanup_managed_files_failed",
BridgeManagementPhase::RestartService => "restart_service_failed",
BridgeManagementPhase::VerifyHealth => "verify_health_failed",
BridgeManagementPhase::Done => "bridge_management_failed",
}
}
pub(super) fn is_version_newer(candidate: &str, current: &str) -> bool {
let Ok(candidate) = parse_semver(candidate) else {
return false;
};
let Ok(current) = parse_semver(current) else {
return false;
};
candidate > current
}
fn parse_semver(value: &str) -> Result<Version> {
Version::parse(value.trim().trim_start_matches('v'))
.with_context(|| format!("解析版本号失败: {value}"))
}