use std::io::{Read, Write};
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
use std::thread;
use std::time::{Duration as StdDuration, Instant};
use anyhow::{Context, Result, bail};
use super::*;
use crate::bridge_protocol::ManagedBridgeHealthSnapshot;
const HEALTH_CHECK_TIMEOUT: StdDuration = StdDuration::from_secs(3);
const HEALTH_VERIFY_DEADLINE: StdDuration = StdDuration::from_secs(45);
const HEALTH_VERIFY_INTERVAL: StdDuration = StdDuration::from_secs(1);
pub(super) fn read_local_bridge_health(listen_addr: &str) -> Result<ManagedBridgeHealthSnapshot> {
let mut last_error = None;
for address in resolve_health_addresses(listen_addr)? {
match read_health_from_socket(address) {
Ok(health) => return Ok(health),
Err(error) => last_error = Some(error.to_string()),
}
}
bail!(
"无法读取本地 bridge /health: {}",
last_error.unwrap_or_else(|| "没有可用地址".to_string())
)
}
fn resolve_health_addresses(listen_addr: &str) -> Result<Vec<SocketAddr>> {
let normalized = normalize_health_target(listen_addr)?;
let addresses = normalized
.to_socket_addrs()
.with_context(|| format!("解析 bridge 监听地址失败: {normalized}"))?
.collect::<Vec<_>>();
if addresses.is_empty() {
bail!("bridge 监听地址没有可用解析结果: {normalized}");
}
Ok(addresses)
}
fn normalize_health_target(listen_addr: &str) -> Result<String> {
if let Some(rest) = listen_addr.strip_prefix('[') {
let (host, port) = rest.split_once("]:").context("解析 IPv6 监听地址失败")?;
let host = if host == "::" { "::1" } else { host };
return Ok(format!("[{host}]:{port}"));
}
let (host, port) = listen_addr.rsplit_once(':').context("监听地址缺少端口")?;
let normalized_host = match host {
"0.0.0.0" | "" | "*" => "127.0.0.1",
"::" => "::1",
value => value,
};
if normalized_host.contains(':') {
Ok(format!("[{normalized_host}]:{port}"))
} else {
Ok(format!("{normalized_host}:{port}"))
}
}
fn read_health_from_socket(address: SocketAddr) -> Result<ManagedBridgeHealthSnapshot> {
let mut stream = TcpStream::connect_timeout(&address, HEALTH_CHECK_TIMEOUT)
.with_context(|| format!("连接本地 /health 失败: {address}"))?;
stream
.set_read_timeout(Some(HEALTH_CHECK_TIMEOUT))
.with_context(|| format!("设置读取超时失败: {address}"))?;
stream
.set_write_timeout(Some(HEALTH_CHECK_TIMEOUT))
.with_context(|| format!("设置写入超时失败: {address}"))?;
stream
.write_all(b"GET /health HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
.with_context(|| format!("写入 /health 请求失败: {address}"))?;
let mut response = Vec::new();
stream
.read_to_end(&mut response)
.with_context(|| format!("读取 /health 响应失败: {address}"))?;
let response = String::from_utf8(response).context("解析 /health 响应失败: 非 UTF-8")?;
let (headers, body) = response
.split_once("\r\n\r\n")
.context("本地 /health 响应不完整")?;
let status_line = headers.lines().next().unwrap_or_default();
if !status_line.contains(" 200 ") {
bail!("本地 /health 返回非 200: {status_line}");
}
serde_json::from_str(body).context("解析 /health JSON 失败")
}
pub(super) fn wait_for_expected_health(
paths: &ManagedPaths,
expected_version: Option<&str>,
expected_protocol: Option<i32>,
) -> Result<ManagedBridgeHealthSnapshot> {
let listen_addr = managed_listen_addr(paths)?;
let deadline = Instant::now() + HEALTH_VERIFY_DEADLINE;
let mut last_error = None;
while Instant::now() < deadline {
match read_local_bridge_health(&listen_addr) {
Ok(health) => {
let version_ready = expected_version
.map(|value| health.bridge_version.as_deref() == Some(value))
.unwrap_or(true);
let protocol_ready = expected_protocol
.map(|value| health.protocol_version == Some(value))
.unwrap_or(true);
if health.ok && version_ready && protocol_ready {
return Ok(health);
}
last_error = Some(format!(
"health 已返回但尚未达到目标版本/协议: version={} protocol={}",
health.bridge_version.as_deref().unwrap_or("-"),
health
.protocol_version
.map(|value| value.to_string())
.unwrap_or_else(|| "-".to_string()),
));
}
Err(error) => last_error = Some(error.to_string()),
}
thread::sleep(HEALTH_VERIFY_INTERVAL);
}
bail!(
"等待 bridge /health 就绪超时: {}",
last_error.unwrap_or_else(|| "未知原因".to_string())
)
}
pub(super) fn success_detail(health: &ManagedBridgeHealthSnapshot) -> String {
format!(
"bridgeVersion={} protocol={} runtimes={}",
health.bridge_version.as_deref().unwrap_or("-"),
health
.protocol_version
.map(|value| value.to_string())
.unwrap_or_else(|| "-".to_string()),
health.runtime_count,
)
}